Skip to content

Commit d0f9653

Browse files
committed
recycle storage for large message
1 parent b992b5f commit d0f9653

File tree

4 files changed

+102
-30
lines changed

4 files changed

+102
-30
lines changed

src/ipc.cpp

Lines changed: 97 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include "libipc/utility/log.h"
2222
#include "libipc/utility/id_pool.h"
23+
#include "libipc/utility/scope_guard.h"
2324
#include "libipc/utility/utility.h"
2425

2526
#include "libipc/memory/resource.h"
@@ -39,7 +40,7 @@ struct msg_t;
3940

4041
template <std::size_t AlignSize>
4142
struct msg_t<0, AlignSize> {
42-
msg_id_t conn_;
43+
msg_id_t cc_id_;
4344
msg_id_t id_;
4445
std::int32_t remain_;
4546
bool storage_;
@@ -50,8 +51,8 @@ struct msg_t : msg_t<0, AlignSize> {
5051
std::aligned_storage_t<DataSize, AlignSize> data_ {};
5152

5253
msg_t() = default;
53-
msg_t(msg_id_t conn, msg_id_t id, std::int32_t remain, void const * data, std::size_t size)
54-
: msg_t<0, AlignSize> {conn, id, remain, (data == nullptr) || (size == 0)} {
54+
msg_t(msg_id_t cc_id, msg_id_t id, std::int32_t remain, void const * data, std::size_t size)
55+
: msg_t<0, AlignSize> {cc_id, id, remain, (data == nullptr) || (size == 0)} {
5556
if (this->storage_) {
5657
if (data != nullptr) {
5758
// copy storage-id
@@ -96,9 +97,21 @@ IPC_CONSTEXPR_ std::size_t align_chunk_size(std::size_t size) noexcept {
9697
}
9798

9899
IPC_CONSTEXPR_ std::size_t calc_chunk_size(std::size_t size) noexcept {
99-
return ipc::make_align(alignof(std::max_align_t), align_chunk_size(size));
100+
return ipc::make_align(alignof(std::max_align_t), align_chunk_size(
101+
ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic<ipc::circ::cc_t>)) + size));
100102
}
101103

104+
struct chunk_t {
105+
std::atomic<ipc::circ::cc_t> &conns() noexcept {
106+
return *reinterpret_cast<std::atomic<ipc::circ::cc_t> *>(this);
107+
}
108+
109+
void *data() noexcept {
110+
return reinterpret_cast<ipc::byte_t *>(this)
111+
+ ipc::make_align(alignof(std::max_align_t), sizeof(std::atomic<ipc::circ::cc_t>));
112+
}
113+
};
114+
102115
struct chunk_info_t {
103116
ipc::id_pool<> pool_;
104117
ipc::spin_lock lock_;
@@ -107,13 +120,13 @@ struct chunk_info_t {
107120
return ipc::id_pool<>::max_count * chunk_size;
108121
}
109122

110-
ipc::byte_t* chunks_mem() noexcept {
111-
return reinterpret_cast<ipc::byte_t*>(this + 1);
123+
ipc::byte_t *chunks_mem() noexcept {
124+
return reinterpret_cast<ipc::byte_t *>(this + 1);
112125
}
113126

114-
ipc::byte_t* at(std::size_t chunk_size, ipc::storage_id_t id) noexcept {
127+
chunk_t *at(std::size_t chunk_size, ipc::storage_id_t id) noexcept {
115128
if (id < 0) return nullptr;
116-
return chunks_mem() + (chunk_size * id);
129+
return reinterpret_cast<chunk_t *>(chunks_mem() + (chunk_size * id));
117130
}
118131
};
119132

@@ -145,7 +158,7 @@ chunk_info_t *chunk_storage_info(std::size_t chunk_size) {
145158
return chunk_storages()[chunk_size].get_info(chunk_size);
146159
}
147160

148-
std::pair<ipc::storage_id_t, void*> acquire_storage(std::size_t size) {
161+
std::pair<ipc::storage_id_t, void*> acquire_storage(std::size_t size, ipc::circ::cc_t conns) {
149162
std::size_t chunk_size = calc_chunk_size(size);
150163
auto info = chunk_storage_info(chunk_size);
151164
if (info == nullptr) return {};
@@ -156,7 +169,10 @@ std::pair<ipc::storage_id_t, void*> acquire_storage(std::size_t size) {
156169
auto id = info->pool_.acquire();
157170
info->lock_.unlock();
158171

159-
return { id, info->at(chunk_size, id) };
172+
auto chunk = info->at(chunk_size, id);
173+
if (chunk == nullptr) return {};
174+
chunk->conns().store(conns, std::memory_order_relaxed);
175+
return { id, chunk->data() };
160176
}
161177

162178
void *find_storage(ipc::storage_id_t id, std::size_t size) {
@@ -167,7 +183,7 @@ void *find_storage(ipc::storage_id_t id, std::size_t size) {
167183
std::size_t chunk_size = calc_chunk_size(size);
168184
auto info = chunk_storage_info(chunk_size);
169185
if (info == nullptr) return nullptr;
170-
return info->at(chunk_size, id);
186+
return info->at(chunk_size, id)->data();
171187
}
172188

173189
void release_storage(ipc::storage_id_t id, std::size_t size) {
@@ -183,13 +199,53 @@ void release_storage(ipc::storage_id_t id, std::size_t size) {
183199
info->lock_.unlock();
184200
}
185201

202+
template <ipc::relat Rp, ipc::relat Rc>
203+
bool sub_rc(ipc::wr<Rp, Rc, ipc::trans::unicast>,
204+
std::atomic<ipc::circ::cc_t> &/*conns*/, ipc::circ::cc_t /*curr_conns*/, ipc::circ::cc_t /*conn_id*/) noexcept {
205+
return true;
206+
}
207+
208+
template <ipc::relat Rp, ipc::relat Rc>
209+
bool sub_rc(ipc::wr<Rp, Rc, ipc::trans::broadcast>,
210+
std::atomic<ipc::circ::cc_t> &conns, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) noexcept {
211+
auto last_conns = curr_conns & ~conn_id;
212+
for (unsigned k = 0;;) {
213+
auto chunk_conns = conns.load(std::memory_order_acquire);
214+
if (conns.compare_exchange_weak(chunk_conns, chunk_conns & last_conns, std::memory_order_release)) {
215+
return (chunk_conns & last_conns) == 0;
216+
}
217+
ipc::yield(k);
218+
}
219+
}
220+
221+
template <typename Flag>
222+
void recycle_storage(ipc::storage_id_t id, std::size_t size, ipc::circ::cc_t curr_conns, ipc::circ::cc_t conn_id) {
223+
if (id < 0) {
224+
ipc::error("[recycle_storage] id is invalid: id = %ld, size = %zd\n", (long)id, size);
225+
return;
226+
}
227+
std::size_t chunk_size = calc_chunk_size(size);
228+
auto info = chunk_storage_info(chunk_size);
229+
if (info == nullptr) return;
230+
231+
auto chunk = info->at(chunk_size, id);
232+
if (chunk == nullptr) return;
233+
234+
if (!sub_rc(Flag{}, chunk->conns(), curr_conns, conn_id)) {
235+
return;
236+
}
237+
info->lock_.lock();
238+
info->pool_.release(id);
239+
info->lock_.unlock();
240+
}
241+
186242
template <typename MsgT>
187-
bool recycle_message(void* p) {
243+
bool clear_message(void* p) {
188244
auto msg = static_cast<MsgT*>(p);
189245
if (msg->storage_) {
190246
std::int32_t r_size = static_cast<std::int32_t>(ipc::data_length) + msg->remain_;
191247
if (r_size <= 0) {
192-
ipc::error("[recycle_message] invalid msg size: %d\n", (int)r_size);
248+
ipc::error("[clear_message] invalid msg size: %d\n", (int)r_size);
193249
return true;
194250
}
195251
release_storage(
@@ -278,8 +334,10 @@ struct queue_generator {
278334
template <typename Policy>
279335
struct detail_impl {
280336

281-
using queue_t = typename queue_generator<Policy>::queue_t;
282-
using conn_info_t = typename queue_generator<Policy>::conn_info_t;
337+
using policy_t = Policy;
338+
using flag_t = typename policy_t::flag_t;
339+
using queue_t = typename queue_generator<policy_t>::queue_t;
340+
using conn_info_t = typename queue_generator<policy_t>::conn_info_t;
283341

284342
constexpr static conn_info_t* info_of(ipc::handle_t h) noexcept {
285343
return static_cast<conn_info_t*>(h);
@@ -373,7 +431,8 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
373431
ipc::error("fail: send, que->ready_sending() == false\n");
374432
return false;
375433
}
376-
if (que->elems()->connections(std::memory_order_relaxed) == 0) {
434+
ipc::circ::cc_t conns = que->elems()->connections(std::memory_order_relaxed);
435+
if (conns == 0) {
377436
ipc::error("fail: send, there is no receiver on this connection.\n");
378437
return false;
379438
}
@@ -386,7 +445,7 @@ static bool send(F&& gen_push, ipc::handle_t h, void const * data, std::size_t s
386445
auto msg_id = acc->fetch_add(1, std::memory_order_relaxed);
387446
auto try_push = std::forward<F>(gen_push)(info_of(h), que, msg_id);
388447
if (size > ipc::large_msg_limit) {
389-
auto dat = acquire_storage(size);
448+
auto dat = acquire_storage(size, conns);
390449
void * buf = dat.second;
391450
if (buf != nullptr) {
392451
std::memcpy(buf, data, size);
@@ -426,7 +485,7 @@ static bool send(ipc::handle_t h, void const * data, std::size_t size, std::size
426485
}, tm)) {
427486
ipc::log("force_push: msg_id = %zd, remain = %d, size = %zd\n", msg_id, remain, size);
428487
if (!que->force_push(
429-
recycle_message<typename queue_t::value_t>,
488+
clear_message<typename queue_t::value_t>,
430489
info->cc_id_, msg_id, remain, data, size)) {
431490
return false;
432491
}
@@ -467,15 +526,14 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
467526
for (;;) {
468527
// pop a new message
469528
typename queue_t::value_t msg;
470-
bool recycled = false;
471-
if (!wait_for(info_of(h)->rd_waiter_, [que, &msg, &recycled] {
472-
return !que->pop(msg, [&recycled](bool r) { recycled = r; });
529+
if (!wait_for(info_of(h)->rd_waiter_, [que, &msg] {
530+
return !que->pop(msg);
473531
}, tm)) {
474532
// pop failed, just return.
475533
return {};
476534
}
477535
info_of(h)->wt_waiter_.broadcast();
478-
if ((info_of(h)->acc() != nullptr) && (msg.conn_ == info_of(h)->cc_id_)) {
536+
if ((info_of(h)->acc() != nullptr) && (msg.cc_id_ == info_of(h)->cc_id_)) {
479537
continue; // ignore message to self
480538
}
481539
// msg.remain_ may minus & abs(msg.remain_) < data_length
@@ -490,12 +548,24 @@ static ipc::buff_t recv(ipc::handle_t h, std::size_t tm) {
490548
ipc::storage_id_t buf_id = *reinterpret_cast<ipc::storage_id_t*>(&msg.data_);
491549
void* buf = find_storage(buf_id, msg_size);
492550
if (buf != nullptr) {
493-
if (recycled) {
494-
return ipc::buff_t{buf, msg_size, [](void* pmid, std::size_t size) {
495-
release_storage(ipc::detail::horrible_cast<ipc::storage_id_t>(pmid) - 1, size);
496-
}, ipc::detail::horrible_cast<void*>(buf_id + 1)};
497-
} else {
551+
struct recycle_t {
552+
ipc::storage_id_t storage_id;
553+
ipc::circ::cc_t curr_conns;
554+
ipc::circ::cc_t conn_id;
555+
} *r_info = ipc::mem::alloc<recycle_t>(recycle_t{
556+
buf_id, que->elems()->connections(std::memory_order_relaxed), que->connected_id()
557+
});
558+
if (r_info == nullptr) {
559+
ipc::log("fail: ipc::mem::alloc<recycle_t>.\n");
498560
return ipc::buff_t{buf, msg_size}; // no recycle
561+
} else {
562+
return ipc::buff_t{buf, msg_size, [](void* p_info, std::size_t size) {
563+
auto r_info = static_cast<recycle_t *>(p_info);
564+
IPC_UNUSED_ auto finally = ipc::guard([r_info] {
565+
ipc::mem::free(r_info);
566+
});
567+
recycle_storage<flag_t>(r_info->storage_id, size, r_info->curr_conns, r_info->conn_id);
568+
}, r_info};
499569
}
500570
} else {
501571
ipc::log("fail: shm::handle for large message. msg_id: %zd, buf_id: %zd, size: %zd\n", msg.id_, buf_id, msg_size);

src/libipc/circ/elem_def.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ namespace circ {
1616
using u1_t = ipc::uint_t<8>;
1717
using u2_t = ipc::uint_t<32>;
1818

19-
/** only supports max 32 connections */
19+
/** only supports max 32 connections in broadcast mode */
2020
using cc_t = u2_t;
2121

2222
constexpr u1_t index_of(u2_t c) noexcept {

src/libipc/policy.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@ struct choose;
1515

1616
template <typename Flag>
1717
struct choose<circ::elem_array, Flag> {
18+
using flag_t = Flag;
19+
1820
template <std::size_t DataSize, std::size_t AlignSize>
19-
using elems_t = circ::elem_array<ipc::prod_cons_impl<Flag>, DataSize, AlignSize>;
21+
using elems_t = circ::elem_array<ipc::prod_cons_impl<flag_t>, DataSize, AlignSize>;
2022
};
2123

2224
} // namespace policy

test/test_ipc.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ TEST(IPC, 1vN) {
170170
//test_sr<relat::single, relat::multi , trans::unicast >("smu", 1, MultiMax);
171171
//test_sr<relat::multi , relat::multi , trans::unicast >("mmu", 1, MultiMax);
172172
test_sr<relat::single, relat::multi , trans::broadcast>("smb", 1, MultiMax);
173-
//test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", 1, MultiMax);
173+
test_sr<relat::multi , relat::multi , trans::broadcast>("mmb", 1, MultiMax);
174174
}
175175

176176
TEST(IPC, Nv1) {

0 commit comments

Comments
 (0)