Skip to content

Commit c1dc10e

Browse files
committed
Refactor test_concur_concurrent.cpp to add dirty write test
1 parent 7e1731c commit c1dc10e

File tree

4 files changed

+128
-24
lines changed

4 files changed

+128
-24
lines changed

include/libconcur/concurrent.h

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -325,8 +325,8 @@ struct producer<trans::broadcast, relation::single> {
325325
hdr.w_idx.fetch_add(1, std::memory_order_release);
326326
// Set data & flag.
327327
elem.set_flag(w_idx | state::enqueue_mask);
328-
elem.set_data(std::forward<U>(src)); // Here should not be interrupted.
329-
elem.set_flag(w_idx | state::commit_mask);
328+
elem.set_data(std::forward<U>(src));
329+
elem.set_flag(w_idx);
330330
return true;
331331
}
332332
};
@@ -336,43 +336,44 @@ template <>
336336
struct producer<trans::broadcast, relation::multi> {
337337

338338
struct header_impl {
339-
std::atomic<state::flag_t> w_flags {0}; ///< write flags, combined current and starting index.
340-
private: padding<decltype(w_flags)> ___;
339+
std::atomic<std::uint64_t> w_contexts {0}; ///< write contexts, combined current and starting index.
340+
private: padding<decltype(w_contexts)> ___;
341341

342342
public:
343343
void get(index_t &idx, index_t &beg) const noexcept {
344-
auto w_flags = this->w_flags.load(std::memory_order_relaxed);
345-
idx = get_index(w_flags);
346-
beg = get_begin(w_flags);
344+
auto w_contexts = this->w_contexts.load(std::memory_order_relaxed);
345+
idx = get_index(w_contexts);
346+
beg = get_begin(w_contexts);
347347
}
348348
};
349349

350350
template <typename T, typename H, typename C, typename U,
351351
verify_elems_header<H> = true,
352352
convertible<H, header_impl> = true>
353353
static bool enqueue(::LIBIMP::span<element<T>> elems, H &hdr, C &/*ctx*/, U &&src) noexcept {
354-
auto w_flags = hdr.w_flags.load(std::memory_order_acquire);
354+
auto w_contexts = hdr.w_contexts.load(std::memory_order_acquire);
355355
index_t w_idx;
356356
for (;;) {
357-
w_idx = get_index(w_flags);
358-
auto w_beg = get_begin(w_flags);
357+
w_idx = get_index(w_contexts);
358+
auto w_beg = get_begin(w_contexts);
359359
// Move the queue head index.
360360
if (w_beg + hdr.circ_size <= w_idx) {
361361
w_beg += 1;
362362
}
363-
// Update flags.
364-
auto n_flags = make_flags(w_idx + 1/*iterate backwards*/, w_beg);
365-
if (hdr.w_flags.compare_exchange_weak(w_flags, n_flags, std::memory_order_acq_rel)) {
363+
// Update write contexts.
364+
auto n_contexts = make_w_contexts(w_idx + 1/*iterate backwards*/, w_beg);
365+
if (hdr.w_contexts.compare_exchange_weak(w_contexts, n_contexts, std::memory_order_acq_rel)) {
366366
break;
367367
}
368368
}
369369
// Get element.
370370
auto w_cur = trunc_index(hdr, w_idx);
371371
auto &elem = elems[w_cur];
372-
// Set data & flag.
372+
// Set data & flag. Dirty write is not considered here.
373+
// By default, when dirty write occurs, the previous writer must no longer exist.
373374
elem.set_flag(w_idx | state::enqueue_mask);
374-
elem.set_data(std::forward<U>(src)); // Here should not be interrupted.
375-
elem.set_flag(w_idx | state::commit_mask);
375+
elem.set_data(std::forward<U>(src));
376+
elem.set_flag(w_idx);
376377
return true;
377378
}
378379

@@ -387,8 +388,8 @@ struct producer<trans::broadcast, relation::multi> {
387388
return index_t(flags >> (sizeof(index_t) * CHAR_BIT));
388389
}
389390

390-
static constexpr state::flag_t make_flags(index_t idx, index_t beg) noexcept {
391-
return state::flag_t(idx) | (state::flag_t(beg) << (sizeof(index_t) * CHAR_BIT));
391+
static constexpr std::uint64_t make_w_contexts(index_t idx, index_t beg) noexcept {
392+
return std::uint64_t(idx) | (std::uint64_t(beg) << (sizeof(index_t) * CHAR_BIT));
392393
}
393394
};
394395

@@ -426,16 +427,16 @@ struct consumer<trans::broadcast, relation::multi> {
426427
}
427428
// Try getting data.
428429
for (;;) {
429-
if ((f_ct & state::enqueue_mask) == state::enqueue_mask) {
430+
if (f_ct & state::enqueue_mask) {
430431
return false; // unreadable
431432
}
432433
des = LIBCONCUR::get(elem);
433434
// Correct data can be obtained only if
434435
// the elem data is not modified during the getting process.
435436
if (elem.cas_flag(f_ct, f_ct)) break;
436437
}
437-
ctx.w_lst = (f_ct & ~state::enqueue_mask) + 1;
438438
// Get a valid index and iterate backwards.
439+
ctx.w_lst = index_t(f_ct) + 1;
439440
ctx.r_idx += 1;
440441
return true;
441442
}

include/libconcur/element.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ using flag_t = std::uint64_t;
2727
enum : flag_t {
2828
invalid_value = ~flag_t(0),
2929
enqueue_mask = invalid_value << 32,
30-
commit_mask = ~flag_t(1) << 32,
3130
};
3231

3332
} // namespace state

test/concur/test_concur_concurrent.cpp

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
#include "libimp/log.h"
1414
#include "libimp/nameof.h"
1515

16+
#include "test_util.h"
17+
1618
TEST(concurrent, cache_line_size) {
1719
std::cout << concur::cache_line_size << "\n";
1820
EXPECT_TRUE(concur::cache_line_size >= alignof(std::max_align_t));
@@ -311,4 +313,61 @@ TEST(concurrent, broadcast) {
311313

312314
/// \brief 8-8
313315
test_broadcast<prod_cons<trans::broadcast, relation::multi , relation::multi>>(8, 8);
314-
}
316+
}
317+
318+
TEST(concurrent, broadcast_multi_dirtywrite) {
319+
using namespace concur;
320+
321+
struct data {
322+
std::uint64_t n{};
323+
324+
data &operator=(test::latch &l) noexcept {
325+
l.arrive_and_wait();
326+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
327+
n = 1;
328+
return *this;
329+
}
330+
331+
data &operator=(data const &rhs) noexcept {
332+
n = rhs.n;
333+
return *this;
334+
}
335+
};
336+
337+
element<data> circ[2] {};
338+
prod_cons<trans::broadcast, relation::multi, relation::multi> pc;
339+
typename traits<decltype(pc)>::header hdr {imp::make_span(circ)};
340+
341+
auto push_one = [&, ctx = typename concur::traits<decltype(pc)>::context{}](auto &i) mutable {
342+
return pc.enqueue(imp::make_span(circ), hdr, ctx, i);
343+
};
344+
auto pop_one = [&, ctx = typename concur::traits<decltype(pc)>::context{}]() mutable {
345+
data i;
346+
if (pc.dequeue(imp::make_span(circ), hdr, ctx, i)) {
347+
return i;
348+
}
349+
return data{};
350+
};
351+
352+
test::latch l(2);
353+
std::thread t[2];
354+
t[0] = std::thread([&] {
355+
push_one(l); // 1
356+
});
357+
t[1] = std::thread([&] {
358+
l.arrive_and_wait();
359+
push_one(data{3});
360+
push_one(data{2}); // dirty write
361+
});
362+
363+
for (int i = 0; i < 2; ++i) {
364+
t[i].join();
365+
}
366+
std::set<std::uint64_t> s{1, 2, 3};
367+
for (int i = 0; i < 2; ++i) {
368+
auto d = pop_one();
369+
EXPECT_TRUE(s.find(d.n) != s.end());
370+
s.erase(d.n);
371+
}
372+
EXPECT_TRUE(s.find(3) == s.end());
373+
}

test/test_util.h

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,22 @@
11

22
#pragma once
33

4-
#include <sys/wait.h>
5-
#include <unistd.h>
4+
#include "libimp/detect_plat.h"
5+
#ifndef LIBIMP_OS_WIN
6+
# include <sys/wait.h>
7+
# include <unistd.h>
8+
#else
9+
# define pid_t int
10+
#endif
11+
12+
#include <condition_variable>
13+
#include <mutex>
614

715
namespace test {
816

917
template <typename Fn>
1018
pid_t subproc(Fn&& fn) {
19+
#ifndef LIBIMP_OS_WIN
1120
pid_t pid = fork();
1221
if (pid == -1) {
1322
return pid;
@@ -21,13 +30,49 @@ pid_t subproc(Fn&& fn) {
2130
exit(0);
2231
}
2332
return pid;
33+
#else
34+
return -1;
35+
#endif
2436
}
2537

2638
inline void join_subproc(pid_t pid) {
39+
#ifndef LIBIMP_OS_WIN
2740
int ret_code;
2841
waitpid(pid, &ret_code, 0);
42+
#endif
2943
}
3044

45+
/// \brief A simple latch implementation.
46+
class latch {
47+
public:
48+
explicit latch(int count) : count_(count) {}
49+
50+
void count_down() {
51+
std::unique_lock<std::mutex> lock(mutex_);
52+
if (count_ > 0) {
53+
--count_;
54+
if (count_ == 0) {
55+
cv_.notify_all();
56+
}
57+
}
58+
}
59+
60+
void wait() {
61+
std::unique_lock<std::mutex> lock(mutex_);
62+
cv_.wait(lock, [&] { return count_ == 0; });
63+
}
64+
65+
void arrive_and_wait() {
66+
count_down();
67+
wait();
68+
}
69+
70+
private:
71+
std::mutex mutex_;
72+
std::condition_variable cv_;
73+
int count_;
74+
};
75+
3176
} // namespace test
3277

3378
#define REQUIRE_EXIT(...) \

0 commit comments

Comments
 (0)