Skip to content

Commit ed8b1fd

Browse files
committed
fix some bugs for linux-mutex
1 parent a9cb81b commit ed8b1fd

File tree

9 files changed

+180
-125
lines changed

9 files changed

+180
-125
lines changed

include/libipc/shm.h

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,13 @@ enum : unsigned {
1515
open = 0x02
1616
};
1717

18-
IPC_EXPORT id_t acquire(char const * name, std::size_t size, unsigned mode = create | open);
19-
IPC_EXPORT void * get_mem(id_t id, std::size_t * size);
20-
IPC_EXPORT void release(id_t id);
21-
IPC_EXPORT void remove (id_t id);
22-
IPC_EXPORT void remove (char const * name);
18+
IPC_EXPORT id_t acquire(char const * name, std::size_t size, unsigned mode = create | open);
19+
IPC_EXPORT void * get_mem(id_t id, std::size_t * size);
20+
IPC_EXPORT std::int32_t release(id_t id);
21+
IPC_EXPORT void remove (id_t id);
22+
IPC_EXPORT void remove (char const * name);
2323

24-
IPC_EXPORT std::uint32_t get_ref(id_t id);
24+
IPC_EXPORT std::int32_t get_ref(id_t id);
2525
IPC_EXPORT void sub_ref(id_t id);
2626

2727
class IPC_EXPORT handle {
@@ -39,11 +39,11 @@ class IPC_EXPORT handle {
3939
std::size_t size () const noexcept;
4040
char const * name () const noexcept;
4141

42-
std::uint32_t ref() const noexcept;
42+
std::int32_t ref() const noexcept;
4343
void sub_ref() noexcept;
4444

4545
bool acquire(char const * name, std::size_t size, unsigned mode = create | open);
46-
void release();
46+
std::int32_t release();
4747

4848
void* get() const;
4949

src/libipc/platform/condition_linux.h

Lines changed: 26 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -50,33 +50,34 @@ class condition {
5050
if ((cond_ = acquire_cond(name)) == nullptr) {
5151
return false;
5252
}
53-
if (shm_.ref() == 1) {
54-
::pthread_cond_destroy(cond_);
55-
auto finally = ipc::guard([this] { close(); }); // close when failed
56-
// init condition
57-
int eno;
58-
pthread_condattr_t cond_attr;
59-
if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) {
60-
ipc::error("fail pthread_condattr_init[%d]\n", eno);
61-
return false;
62-
}
63-
IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy);
64-
if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) {
65-
ipc::error("fail pthread_condattr_setpshared[%d]\n", eno);
66-
return false;
67-
}
68-
*cond_ = PTHREAD_COND_INITIALIZER;
69-
if ((eno = ::pthread_cond_init(cond_, &cond_attr)) != 0) {
70-
ipc::error("fail pthread_cond_init[%d]\n", eno);
71-
return false;
72-
}
73-
finally.dismiss();
53+
if (shm_.ref() > 1) {
54+
return valid();
55+
}
56+
::pthread_cond_destroy(cond_);
57+
auto finally = ipc::guard([this] { close(); }); // close when failed
58+
// init condition
59+
int eno;
60+
pthread_condattr_t cond_attr;
61+
if ((eno = ::pthread_condattr_init(&cond_attr)) != 0) {
62+
ipc::error("fail pthread_condattr_init[%d]\n", eno);
63+
return false;
64+
}
65+
IPC_UNUSED_ auto guard_cond_attr = unique_ptr(&cond_attr, ::pthread_condattr_destroy);
66+
if ((eno = ::pthread_condattr_setpshared(&cond_attr, PTHREAD_PROCESS_SHARED)) != 0) {
67+
ipc::error("fail pthread_condattr_setpshared[%d]\n", eno);
68+
return false;
69+
}
70+
*cond_ = PTHREAD_COND_INITIALIZER;
71+
if ((eno = ::pthread_cond_init(cond_, &cond_attr)) != 0) {
72+
ipc::error("fail pthread_cond_init[%d]\n", eno);
73+
return false;
7474
}
75+
finally.dismiss();
7576
return valid();
7677
}
7778

7879
void close() noexcept {
79-
if (shm_.ref() == 1) {
80+
if ((shm_.ref() <= 1) && cond_ != nullptr) {
8081
int eno;
8182
if ((eno = ::pthread_cond_destroy(cond_)) != 0) {
8283
ipc::error("fail pthread_cond_destroy[%d]\n", eno);
@@ -87,9 +88,8 @@ class condition {
8788
}
8889

8990
bool wait(ipc::sync::mutex &mtx, std::uint64_t tm) noexcept {
91+
if (!valid()) return false;
9092
switch (tm) {
91-
case 0:
92-
return true;
9393
case invalid_value: {
9494
int eno;
9595
if ((eno = ::pthread_cond_wait(cond_, static_cast<pthread_mutex_t *>(mtx.native()))) != 0) {
@@ -115,6 +115,7 @@ class condition {
115115
}
116116

117117
bool notify() noexcept {
118+
if (!valid()) return false;
118119
int eno;
119120
if ((eno = ::pthread_cond_signal(cond_)) != 0) {
120121
ipc::error("fail pthread_cond_signal[%d]\n", eno);
@@ -124,6 +125,7 @@ class condition {
124125
}
125126

126127
bool broadcast() noexcept {
128+
if (!valid()) return false;
127129
int eno;
128130
if ((eno = ::pthread_cond_broadcast(cond_)) != 0) {
129131
ipc::error("fail pthread_cond_broadcast[%d]\n", eno);

src/libipc/platform/mutex_linux.h

Lines changed: 100 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
#include <cstring>
44
#include <cassert>
5+
#include <cstdint>
56
#include <system_error>
67
#include <mutex>
8+
#include <atomic>
79

810
#include <pthread.h>
911

@@ -19,33 +21,61 @@ namespace detail {
1921
namespace sync {
2022

2123
class mutex {
22-
ipc::shm::handle shm_;
24+
ipc::shm::handle *shm_ = nullptr;
25+
std::atomic<std::int32_t> *ref_ = nullptr;
2326
pthread_mutex_t *mutex_ = nullptr;
2427

28+
struct curr_prog {
29+
struct shm_data {
30+
ipc::shm::handle shm;
31+
std::atomic<std::int32_t> ref;
32+
33+
struct init {
34+
char const *name;
35+
std::size_t size;
36+
};
37+
shm_data(init arg)
38+
: shm{arg.name, arg.size}, ref{0} {}
39+
};
40+
ipc::map<ipc::string, shm_data> mutex_handles;
41+
std::mutex lock;
42+
43+
static curr_prog &get() {
44+
static curr_prog info;
45+
return info;
46+
}
47+
};
48+
2549
pthread_mutex_t *acquire_mutex(char const *name) {
26-
if (!shm_.acquire(name, sizeof(pthread_mutex_t))) {
27-
ipc::error("[acquire_mutex] fail shm.acquire: %s\n", name);
50+
if (name == nullptr) {
2851
return nullptr;
2952
}
30-
return static_cast<pthread_mutex_t *>(shm_.get());
53+
auto &info = curr_prog::get();
54+
IPC_UNUSED_ std::lock_guard<std::mutex> guard {info.lock};
55+
auto it = info.mutex_handles.find(name);
56+
if (it == info.mutex_handles.end()) {
57+
it = curr_prog::get().mutex_handles.emplace(name,
58+
curr_prog::shm_data::init{name, sizeof(pthread_mutex_t)}).first;
59+
}
60+
shm_ = &it->second.shm;
61+
ref_ = &it->second.ref;
62+
if (shm_ == nullptr) {
63+
return nullptr;
64+
}
65+
return static_cast<pthread_mutex_t *>(shm_->get());
3166
}
3267

33-
pthread_mutex_t *get_mutex(char const *name) {
34-
if (name == nullptr) {
35-
return nullptr;
68+
template <typename F>
69+
void release_mutex(ipc::string const &name, F &&clear) {
70+
if (name.empty()) return;
71+
IPC_UNUSED_ std::lock_guard<std::mutex> guard {curr_prog::get().lock};
72+
auto it = curr_prog::get().mutex_handles.find(name);
73+
if (it == curr_prog::get().mutex_handles.end()) {
74+
return;
3675
}
37-
static ipc::map<ipc::string, pthread_mutex_t *> mutex_handles;
38-
static std::mutex lock;
39-
IPC_UNUSED_ std::lock_guard<std::mutex> guard {lock};
40-
auto it = mutex_handles.find(name);
41-
if (it == mutex_handles.end()) {
42-
auto ptr = acquire_mutex(name);
43-
if (ptr != nullptr) {
44-
mutex_handles.emplace(name, ptr);
45-
}
46-
return ptr;
76+
if (clear()) {
77+
curr_prog::get().mutex_handles.erase(it);
4778
}
48-
return it->second;
4979
}
5080

5181
public:
@@ -62,56 +92,69 @@ class mutex {
6292

6393
bool valid() const noexcept {
6494
static const char tmp[sizeof(pthread_mutex_t)] {};
65-
return (mutex_ != nullptr)
95+
return (shm_ != nullptr) && (ref_ != nullptr) && (mutex_ != nullptr)
6696
&& (std::memcmp(tmp, mutex_, sizeof(pthread_mutex_t)) != 0);
6797
}
6898

6999
bool open(char const *name) noexcept {
70100
close();
71-
if ((mutex_ = get_mutex(name)) == nullptr) {
101+
if ((mutex_ = acquire_mutex(name)) == nullptr) {
72102
return false;
73103
}
74-
if (shm_.ref() == 1) {
75-
::pthread_mutex_destroy(mutex_);
76-
auto finally = ipc::guard([this] { close(); }); // close when failed
77-
// init mutex
78-
int eno;
79-
pthread_mutexattr_t mutex_attr;
80-
if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) {
81-
ipc::error("fail pthread_mutexattr_init[%d]\n", eno);
82-
return false;
83-
}
84-
IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy);
85-
if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) {
86-
ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno);
87-
return false;
88-
}
89-
if ((eno = ::pthread_mutexattr_setrobust(&mutex_attr, PTHREAD_MUTEX_ROBUST)) != 0) {
90-
ipc::error("fail pthread_mutexattr_setrobust[%d]\n", eno);
91-
return false;
92-
}
93-
*mutex_ = PTHREAD_MUTEX_INITIALIZER;
94-
if ((eno = ::pthread_mutex_init(mutex_, &mutex_attr)) != 0) {
95-
ipc::error("fail pthread_mutex_init[%d]\n", eno);
96-
return false;
97-
}
98-
finally.dismiss();
104+
auto self_ref = ref_->fetch_add(1, std::memory_order_relaxed);
105+
if (shm_->ref() > 1 || self_ref > 0) {
106+
return valid();
107+
}
108+
::pthread_mutex_destroy(mutex_);
109+
auto finally = ipc::guard([this] { close(); }); // close when failed
110+
// init mutex
111+
int eno;
112+
pthread_mutexattr_t mutex_attr;
113+
if ((eno = ::pthread_mutexattr_init(&mutex_attr)) != 0) {
114+
ipc::error("fail pthread_mutexattr_init[%d]\n", eno);
115+
return false;
116+
}
117+
IPC_UNUSED_ auto guard_mutex_attr = unique_ptr(&mutex_attr, ::pthread_mutexattr_destroy);
118+
if ((eno = ::pthread_mutexattr_setpshared(&mutex_attr, PTHREAD_PROCESS_SHARED)) != 0) {
119+
ipc::error("fail pthread_mutexattr_setpshared[%d]\n", eno);
120+
return false;
121+
}
122+
if ((eno = ::pthread_mutexattr_setrobust(&mutex_attr, PTHREAD_MUTEX_ROBUST)) != 0) {
123+
ipc::error("fail pthread_mutexattr_setrobust[%d]\n", eno);
124+
return false;
99125
}
126+
*mutex_ = PTHREAD_MUTEX_INITIALIZER;
127+
if ((eno = ::pthread_mutex_init(mutex_, &mutex_attr)) != 0) {
128+
ipc::error("fail pthread_mutex_init[%d]\n", eno);
129+
return false;
130+
}
131+
finally.dismiss();
100132
return valid();
101133
}
102134

103135
void close() noexcept {
104-
if (shm_.ref() == 1) {
105-
int eno;
106-
if ((eno = ::pthread_mutex_destroy(mutex_)) != 0) {
107-
ipc::error("fail pthread_mutex_destroy[%d]\n", eno);
108-
}
136+
if ((ref_ != nullptr) && (shm_ != nullptr) && (mutex_ != nullptr)) {
137+
if (shm_->name() != nullptr) {
138+
release_mutex(shm_->name(), [this] {
139+
auto self_ref = ref_->fetch_sub(1, std::memory_order_relaxed);
140+
if ((shm_->ref() <= 1) && (self_ref <= 1)) {
141+
int eno;
142+
if ((eno = ::pthread_mutex_destroy(mutex_)) != 0) {
143+
ipc::error("fail pthread_mutex_destroy[%d]\n", eno);
144+
}
145+
return true;
146+
}
147+
return false;
148+
});
149+
} else shm_->release();
109150
}
110-
shm_.release();
151+
shm_ = nullptr;
152+
ref_ = nullptr;
111153
mutex_ = nullptr;
112154
}
113155

114156
bool lock(std::uint64_t tm) noexcept {
157+
if (!valid()) return false;
115158
for (;;) {
116159
auto ts = detail::make_timespec(tm);
117160
int eno = (tm == invalid_value)
@@ -123,8 +166,8 @@ class mutex {
123166
case ETIMEDOUT:
124167
return false;
125168
case EOWNERDEAD: {
126-
if (shm_.ref() > 1) {
127-
shm_.sub_ref();
169+
if (shm_->ref() > 1) {
170+
shm_->sub_ref();
128171
}
129172
int eno2 = ::pthread_mutex_consistent(mutex_);
130173
if (eno2 != 0) {
@@ -146,6 +189,7 @@ class mutex {
146189
}
147190

148191
bool try_lock() noexcept(false) {
192+
if (!valid()) return false;
149193
auto ts = detail::make_timespec(0);
150194
int eno = ::pthread_mutex_timedlock(mutex_, &ts);
151195
switch (eno) {
@@ -154,8 +198,8 @@ class mutex {
154198
case ETIMEDOUT:
155199
return false;
156200
case EOWNERDEAD: {
157-
if (shm_.ref() > 1) {
158-
shm_.sub_ref();
201+
if (shm_->ref() > 1) {
202+
shm_->sub_ref();
159203
}
160204
int eno2 = ::pthread_mutex_consistent(mutex_);
161205
if (eno2 != 0) {
@@ -177,6 +221,7 @@ class mutex {
177221
}
178222

179223
bool unlock() noexcept {
224+
if (!valid()) return false;
180225
int eno;
181226
if ((eno = ::pthread_mutex_unlock(mutex_)) != 0) {
182227
ipc::error("fail pthread_mutex_unlock[%d]\n", eno);

src/libipc/platform/semaphore_linux.h

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,15 @@ class semaphore {
5050
if (::sem_close(h_) != 0) {
5151
ipc::error("fail sem_close[%d]: %s\n", errno);
5252
}
53-
if (shm_.ref() == 1) {
54-
if (::sem_unlink(shm_.name()) != 0) {
55-
ipc::error("fail sem_unlink[%d]: %s\n", errno);
53+
h_ = SEM_FAILED;
54+
if (shm_.name() != nullptr) {
55+
std::string name = shm_.name();
56+
if (shm_.release() <= 1) {
57+
if (::sem_unlink(name.c_str()) != 0) {
58+
ipc::error("fail sem_unlink[%d]: %s, name: %s\n", errno, name.c_str());
59+
}
5660
}
5761
}
58-
shm_.release();
59-
h_ = SEM_FAILED;
6062
}
6163

6264
bool wait(std::uint64_t tm) noexcept {

0 commit comments

Comments
 (0)