Skip to content

Commit 58feb4b

Browse files
lkotuladahlerlend
authored andcommitted
Bug#36347485 - Remove libevent usage from X Plugin
Description =========== MySQL Server source code, contains "net_ts" which may be used as alternative for "libevent". The "net_ts" is maintained and Oracle homemade. Fix === Refactored X Plugin code to use "net::io_context", instead "libevent". Change-Id: I8f3945110feebf041c22b7abafa1b0a872661f4f
1 parent ed3a54d commit 58feb4b

File tree

12 files changed

+129
-140
lines changed

12 files changed

+129
-140
lines changed

CMakeLists.txt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2348,10 +2348,11 @@ CONFIGURE_FILE(${CMAKE_SOURCE_DIR}/sql/sql_builtin.cc.in
23482348
${CMAKE_BINARY_DIR}/sql/sql_builtin.cc)
23492349

23502350
# depends on mysql_version.h to exist
2351-
IF(NOT WITHOUT_SERVER)
2352-
IF(WITH_ROUTER)
2353-
ADD_SUBDIRECTORY(router)
2354-
ENDIF()
2351+
IF(NOT WITHOUT_SERVER AND WITH_ROUTER)
2352+
ADD_SUBDIRECTORY(router)
2353+
ELSE()
2354+
# Define harness_net_ts target and public include directories.
2355+
ADD_SUBDIRECTORY(router/src/harness/include)
23552356
ENDIF()
23562357

23572358
IF(ENABLE_GCOV)

plugin/x/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ IF(WITH_MYSQLX)
8888
DEFAULT
8989
DEPENDENCIES xprotocol_tags
9090
LINK_LIBRARIES
91-
ext::libevent
91+
harness_net_ts
9292
ext::icu
9393
${MYSQLX_PROTOCOL_LIB}
9494
extra::rapidjson

plugin/x/src/ngs/socket_events.cc

Lines changed: 76 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
#include <mutex> // NOLINT(build/c++11)
2929
#include <new>
3030

31-
#include "my_io.h" // NOLINT(build/include_subdir)
31+
#include "my_io.h" // NOLINT(build/include_subdir)
32+
#include "mysql/psi/mysql_socket.h"
3233
#include "violite.h" // NOLINT(build/include_subdir)
3334

3435
#include "plugin/x/src/interface/connection_acceptor.h"
@@ -37,31 +38,12 @@
3738
#include "plugin/x/src/ngs/socket_events.h"
3839
#include "plugin/x/src/operations_factory.h"
3940

40-
// Surpressing numerous warnings generated by libevent on Windows.
41-
#ifdef WIN32
42-
#pragma warning(push)
43-
#pragma warning(disable : 4005)
44-
#endif // WIN32
45-
#include <event2/event.h> // libevent
46-
#include <event2/event_compat.h>
47-
#include <event2/event_struct.h>
48-
#include <event2/thread.h>
49-
#ifdef WIN32
50-
#pragma warning(pop)
51-
#endif // WIN32
52-
53-
#ifdef EVTHREAD_USE_WINDOWS_THREADS_IMPLEMENTED
54-
#define XPL_EVTHREAD_INITIALIZE() evthread_use_windows_threads()
55-
#elif EVTHREAD_USE_PTHREADS_IMPLEMENTED
56-
#define XPL_EVTHREAD_INITIALIZE() evthread_use_pthreads()
57-
#else
58-
#define XPL_EVTHREAD_INITIALIZE() \
59-
do { \
60-
} while (0)
61-
#endif
62-
6341
namespace ngs {
6442

43+
using TcpAcceptor = net::ip::tcp::acceptor;
44+
using Socket = TcpAcceptor::socket_type;
45+
using Endpoint_type = TcpAcceptor::endpoint_type;
46+
6547
class Connection_acceptor_socket : public xpl::iface::Connection_acceptor {
6648
public:
6749
using Socket_ptr = std::shared_ptr<xpl::iface::Socket>;
@@ -112,65 +94,55 @@ class Connection_acceptor_socket : public xpl::iface::Connection_acceptor {
11294
static const int MAX_ACCEPT_REATTEMPT = 10;
11395
};
11496

115-
struct Socket_events::Timer_data {
116-
std::function<bool()> callback;
117-
event ev;
118-
timeval tv;
119-
Socket_events *self;
97+
class Socket_events::EntryTimer {
98+
public:
99+
EntryTimer(net::io_context &io) : timer{io} {}
120100

121-
static void free(Timer_data *data) {
122-
evtimer_del(&data->ev);
123-
free_object(data);
124-
}
101+
public:
102+
std::function<bool()> callback;
103+
std::chrono::milliseconds duration;
104+
net::steady_timer timer;
125105
};
126106

127-
struct Socket_events::Socket_data {
107+
class Socket_events::EntryAcceptingSocket {
108+
public:
109+
EntryAcceptingSocket(net::io_context &io) : acceptor{io} {}
110+
~EntryAcceptingSocket() { acceptor.release(); }
111+
112+
public:
128113
std::function<void(xpl::iface::Connection_acceptor &)> callback;
129-
event ev;
130114
std::shared_ptr<xpl::iface::Socket> socket;
131-
132-
static void free(Socket_data *data) {
133-
event_del(&data->ev);
134-
free_object(data);
135-
}
115+
TcpAcceptor acceptor;
136116
};
137117

138-
Socket_events::Socket_events() {
139-
static std::once_flag flag_event_threads_initialized;
140-
141-
std::call_once(flag_event_threads_initialized,
142-
[]() { XPL_EVTHREAD_INITIALIZE(); });
143-
144-
m_evbase = event_base_new();
145-
146-
if (!m_evbase) throw std::bad_alloc();
147-
}
118+
Socket_events::Socket_events() {}
148119

149120
Socket_events::~Socket_events() {
150121
std::for_each(m_timer_events.begin(), m_timer_events.end(),
151-
&Timer_data::free);
122+
&free_object<EntryTimer>);
152123

153124
std::for_each(m_socket_events.begin(), m_socket_events.end(),
154-
&Socket_data::free);
155-
156-
event_base_free(m_evbase);
125+
&free_object<EntryAcceptingSocket>);
157126
}
158127

159128
bool Socket_events::listen(
160129
std::shared_ptr<xpl::iface::Socket> sock,
161130
std::function<void(xpl::iface::Connection_acceptor &)> callback) {
162-
m_socket_events.push_back(allocate_object<Socket_data>());
163-
Socket_data *socket_event = m_socket_events.back();
131+
m_socket_events.push_back(
132+
allocate_object<EntryAcceptingSocket>(m_io_context));
133+
EntryAcceptingSocket *socket_event = m_socket_events.back();
164134

165135
socket_event->callback = callback;
166136
socket_event->socket = sock;
137+
socket_event->acceptor.assign(Endpoint_type().protocol(),
138+
sock->get_socket_fd());
167139

168-
event_set(&socket_event->ev, static_cast<int>(sock->get_socket_fd()),
169-
EV_READ | EV_PERSIST, &Socket_events::socket_data_avaiable,
170-
socket_event);
171-
event_base_set(m_evbase, &socket_event->ev);
140+
socket_event->acceptor.async_wait(Socket::wait_read,
141+
[this, socket_event](std::error_code ec) {
142+
callback_accept_socket(socket_event, ec);
143+
});
172144

173-
return 0 == event_add(&socket_event->ev, nullptr);
145+
return true;
174146
}
175147

176148
/** Register a callback to be executed in a fixed time interval.
@@ -182,53 +154,61 @@ NOTE: This method may only be called from the same thread as the event loop.
182154
*/
183155
void Socket_events::add_timer(const std::size_t delay_ms,
184156
std::function<bool()> callback) {
185-
Timer_data *data = allocate_object<Timer_data>();
186-
data->tv.tv_sec = static_cast<long>(delay_ms / 1000);
187-
data->tv.tv_usec = (delay_ms % 1000) * 1000;
188-
data->callback = callback;
189-
data->self = this;
190-
// XXX use persistent timer events after switch to libevent2
191-
evtimer_set(&data->ev, timeout_call, data);
192-
event_base_set(m_evbase, &data->ev);
193-
evtimer_add(&data->ev, &data->tv);
194-
195-
MUTEX_LOCK(lock, m_timers_mutex);
196-
m_timer_events.push_back(data);
197-
}
198-
199-
void Socket_events::loop() { event_base_loop(m_evbase, 0); }
157+
EntryTimer *timer_entry = allocate_object<EntryTimer>(m_io_context);
158+
timer_entry->duration = std::chrono::milliseconds{delay_ms};
159+
timer_entry->callback = callback;
160+
{
161+
MUTEX_LOCK(lock, m_timers_mutex);
162+
m_timer_events.push_back(timer_entry);
163+
}
200164

201-
void Socket_events::break_loop() { event_base_loopexit(m_evbase, nullptr); }
165+
timer_entry->timer.expires_after(timer_entry->duration);
166+
timer_entry->timer.async_wait([this, timer_entry](std::error_code ec) {
167+
callback_timeout(timer_entry, ec);
168+
});
169+
}
202170

203-
void Socket_events::timeout_call(socket_type, short, void *arg) {
204-
Timer_data *data = static_cast<Timer_data *>(arg);
171+
void Socket_events::loop() { m_io_context.run(); }
205172

206-
if (!data->callback()) {
207-
evtimer_del(&data->ev);
173+
void Socket_events::break_loop() { m_io_context.stop(); }
208174

175+
void Socket_events::callback_timeout(EntryTimer *timer_entry,
176+
std::error_code ec) {
177+
if (ec || !timer_entry->callback()) {
209178
{
210-
MUTEX_LOCK(timer_lock, data->self->m_timers_mutex);
211-
data->self->m_timer_events.erase(
212-
std::remove(data->self->m_timer_events.begin(),
213-
data->self->m_timer_events.end(), data),
214-
data->self->m_timer_events.end());
179+
MUTEX_LOCK(timer_lock, m_timers_mutex);
180+
181+
m_timer_events.erase(std::remove(m_timer_events.begin(),
182+
m_timer_events.end(), timer_entry),
183+
m_timer_events.end());
215184
}
216185

217-
free_object(data);
186+
free_object(timer_entry);
218187
} else {
219188
// schedule for another round
220-
evtimer_add(&data->ev, &data->tv);
189+
timer_entry->timer.expires_after(timer_entry->duration);
190+
timer_entry->timer.async_wait([this, timer_entry](std::error_code ec) {
191+
callback_timeout(timer_entry, ec);
192+
});
221193
}
222194
}
223195

224-
void Socket_events::socket_data_avaiable(socket_type, short, void *arg) {
225-
Socket_data *data = static_cast<Socket_data *>(arg);
226-
xpl::Operations_factory operations_factory;
227-
std::shared_ptr<xpl::iface::System> system_interface(
228-
operations_factory.create_system_interface());
229-
Connection_acceptor_socket acceptor(data->socket, *system_interface);
230-
231-
data->callback(acceptor);
196+
void Socket_events::callback_accept_socket(
197+
EntryAcceptingSocket *acceptors_entry, std::error_code ec) {
198+
if (!ec) {
199+
xpl::Operations_factory operations_factory;
200+
std::shared_ptr<xpl::iface::System> system_interface(
201+
operations_factory.create_system_interface());
202+
Connection_acceptor_socket vio_socket_forge(acceptors_entry->socket,
203+
*system_interface);
204+
205+
acceptors_entry->callback(vio_socket_forge);
206+
207+
acceptors_entry->acceptor.async_wait(
208+
Socket::wait_read, [this, acceptors_entry](std::error_code ec) {
209+
callback_accept_socket(acceptors_entry, ec);
210+
});
211+
}
232212
}
233213

234214
} // namespace ngs

plugin/x/src/ngs/socket_events.h

Lines changed: 13 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -29,22 +29,18 @@
2929
#include <memory>
3030
#include <vector>
3131

32+
#include "mysql/harness/net_ts.h"
3233
#include "plugin/x/src/helper/multithread/mutex.h"
3334
#include "plugin/x/src/interface/socket_events.h"
3435
#include "plugin/x/src/xpl_performance_schema.h"
3536

36-
struct event_base;
37-
3837
namespace ngs {
3938

4039
class Socket_events : public xpl::iface::Socket_events {
40+
private:
41+
using Socket = net::ip::tcp::socket;
42+
4143
public:
42-
#ifdef _WIN32
43-
// mimick evutil_socket_t in libevent-2.x
44-
using socket_type = intptr_t;
45-
#else
46-
using socket_type = int;
47-
#endif
4844
Socket_events();
4945
~Socket_events() override;
5046

@@ -58,14 +54,16 @@ class Socket_events : public xpl::iface::Socket_events {
5854
void break_loop() override;
5955

6056
private:
61-
static void timeout_call(socket_type sock, short which, void *arg);
62-
static void socket_data_avaiable(socket_type sock, short which, void *arg);
57+
class EntryTimer;
58+
class EntryAcceptingSocket;
59+
60+
void callback_timeout(EntryTimer *timer_entry, std::error_code ec);
61+
void callback_accept_socket(EntryAcceptingSocket *acceptors_entry,
62+
std::error_code ec);
6363

64-
struct Timer_data;
65-
struct Socket_data;
66-
struct event_base *m_evbase;
67-
std::vector<Socket_data *> m_socket_events;
68-
std::vector<Timer_data *> m_timer_events;
64+
net::io_context m_io_context;
65+
std::vector<EntryAcceptingSocket *> m_socket_events;
66+
std::vector<EntryTimer *> m_timer_events;
6967
xpl::Mutex m_timers_mutex{KEY_mutex_x_socket_events_timers};
7068
};
7169

router/src/harness/CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
ADD_SUBDIRECTORY(src)
2525
ADD_SUBDIRECTORY(include)
2626

27-
IF(WITH_UNIT_TESTS)
27+
IF(WITH_UNIT_TESTS AND WITH_ROUTER)
2828
ADD_SUBDIRECTORY(shared)
2929
ADD_SUBDIRECTORY(tests)
3030
ENDIF()

router/src/harness/include/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ TARGET_SOURCES(harness_net_ts
5353
${CMAKE_CURRENT_SOURCE_DIR}/mysql/harness/net_ts/timer.h
5454
${CMAKE_CURRENT_SOURCE_DIR}/mysql/harness/net_ts/win32_named_pipe.h
5555
)
56-
TARGET_LINK_LIBRARIES(harness_net_ts INTERFACE harness_stdx)
5756

5857
IF(CMAKE_SYSTEM_NAME STREQUAL "SunOS")
5958
TARGET_LINK_LIBRARIES(harness_net_ts INTERFACE socket;nsl)

router/src/harness/include/mysql/harness/net_ts/buffer.h

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -902,11 +902,12 @@ read(SyncReadStream &stream, DynamicBuffer &&b, CompletionCondition cond) {
902902

903903
// if socket was non-blocking and some bytes where already read, return
904904
// the success
905-
const auto ec = res.error();
906-
if ((ec == make_error_condition(
907-
std::errc::resource_unavailable_try_again) ||
908-
ec == make_error_condition(std::errc::operation_would_block) ||
909-
ec == net::stream_errc::eof) &&
905+
const auto error_code = res.error();
906+
if ((error_code == make_error_condition(
907+
std::errc::resource_unavailable_try_again) ||
908+
error_code ==
909+
make_error_condition(std::errc::operation_would_block) ||
910+
error_code == net::stream_errc::eof) &&
910911
transferred != 0) {
911912
return transferred;
912913
}

router/src/harness/include/mysql/harness/net_ts/io_context.h

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -499,9 +499,10 @@ class io_context : public execution_context {
499499
// code should be similar to ::cancel(fd)
500500
std::lock_guard<std::mutex> lk(mtx_);
501501

502-
if (auto op = active_ops_.extract_first(fd, static_cast<short>(wt))) {
503-
op->cancel();
504-
cancelled_ops_.push_back(std::move(op));
502+
if (auto async_op =
503+
active_ops_.extract_first(fd, static_cast<short>(wt))) {
504+
async_op->cancel();
505+
cancelled_ops_.push_back(std::move(async_op));
505506
}
506507
}
507508
}
@@ -1175,7 +1176,7 @@ inline io_context::count_type io_context::do_one(
11751176
// timer
11761177
std::chrono::milliseconds min_duration{0};
11771178
{
1178-
std::lock_guard<std::mutex> lk(mtx_);
1179+
std::lock_guard<std::mutex> lock(mtx_);
11791180
// check the smallest timestamp of all timer-queues
11801181
for (auto q : timer_queues_) {
11811182
const auto duration = q->next();
@@ -1198,16 +1199,16 @@ inline io_context::count_type io_context::do_one(
11981199

11991200
if (auto op = [this]() -> std::unique_ptr<async_op> {
12001201
// handle all the cancelled ops without polling first
1201-
std::lock_guard<std::mutex> lk(mtx_);
1202+
std::lock_guard<std::mutex> lock(mtx_);
12021203

12031204
// ops have all cancelled operators at the front
12041205
if (!cancelled_ops_.empty() &&
12051206
cancelled_ops_.front()->is_cancelled()) {
1206-
auto op = std::move(cancelled_ops_.front());
1207+
auto cancelled_op = std::move(cancelled_ops_.front());
12071208

12081209
cancelled_ops_.pop_front();
12091210

1210-
return op;
1211+
return cancelled_op;
12111212
}
12121213

12131214
return {};
@@ -1273,7 +1274,7 @@ inline io_context::count_type io_context::do_one(
12731274

12741275
if (auto op = [this](native_handle_type fd,
12751276
short events) -> std::unique_ptr<async_op> {
1276-
std::lock_guard<std::mutex> lk(mtx_);
1277+
std::lock_guard<std::mutex> lock(mtx_);
12771278

12781279
return active_ops_.extract_first(fd, events);
12791280
}(res->fd, res->event)) {

0 commit comments

Comments
 (0)