Skip to content

Commit 784fe79

Browse files
committed
Merge branch 'mysql-8.0' into mysql-trunk
Change-Id: Ib43ba2dc405d166387427b638fc674195811706f
2 parents b8ce152 + 0f243bd commit 784fe79

12 files changed

+577
-118
lines changed

router/src/routing/src/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@ ADD_LIBRARY(routing SHARED
5656
connection.cc
5757
blocked_endpoints.cc
5858

59-
processor.cc
59+
await_client_or_server.cc
6060
forwarding_processor.cc
61+
processor.cc
6162

6263
classic_auth.cc
6364
classic_auth_cleartext.cc
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
Copyright (c) 2023, Oracle and/or its affiliates.
3+
4+
This program is free software; you can redistribute it and/or modify
5+
it under the terms of the GNU General Public License, version 2.0,
6+
as published by the Free Software Foundation.
7+
8+
This program is also distributed with certain software (including
9+
but not limited to OpenSSL) that is licensed under separate terms,
10+
as designated in a particular file or component or in included license
11+
documentation. The authors of MySQL hereby grant you an additional
12+
permission to link the program and your derivative works with the
13+
separately licensed software that they have included with MySQL.
14+
15+
This program is distributed in the hope that it will be useful,
16+
but WITHOUT ANY WARRANTY; without even the implied warranty of
17+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18+
GNU General Public License for more details.
19+
20+
You should have received a copy of the GNU General Public License
21+
along with this program; if not, write to the Free Software
22+
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23+
*/
24+
25+
#include "await_client_or_server.h"
26+
27+
#include "classic_connection_base.h"
28+
29+
stdx::expected<Processor::Result, std::error_code>
30+
AwaitClientOrServerProcessor::process() {
31+
switch (stage()) {
32+
case Stage::Init:
33+
return init();
34+
case Stage::WaitBoth:
35+
return wait_both();
36+
case Stage::WaitClientCancelled:
37+
return wait_client_cancelled();
38+
case Stage::WaitServerCancelled:
39+
return wait_server_cancelled();
40+
case Stage::Done:
41+
return Result::Done;
42+
}
43+
44+
harness_assert_this_should_not_execute();
45+
}
46+
47+
stdx::expected<Processor::Result, std::error_code>
48+
AwaitClientOrServerProcessor::init() {
49+
stage(Stage::WaitBoth);
50+
51+
connection()->recv_from_either(
52+
MysqlRoutingClassicConnectionBase::FromEither::Started);
53+
54+
return Result::RecvFromBoth;
55+
}
56+
57+
/**
58+
* wait for an read-event from client and server at the same time.
59+
*
60+
* two async-reads have been started, which both will call wait_both(). Only one
61+
* of the two should continue.
62+
*
63+
* To ensure that event handlers are properly synchronized:
64+
*
65+
* - the first returning event, cancels the other waiter and leaves without
66+
* "returning" (::Void)
67+
* - the cancelled side, continues with executing.
68+
*/
69+
stdx::expected<Processor::Result, std::error_code>
70+
AwaitClientOrServerProcessor::wait_both() {
71+
auto *socket_splicer = connection()->socket_splicer();
72+
73+
switch (connection()->recv_from_either()) {
74+
case MysqlRoutingClassicConnectionBase::FromEither::RecvedFromServer: {
75+
// server side sent something.
76+
//
77+
// - cancel the client side
78+
// - read from server in ::wait_client_cancelled
79+
80+
stage(Stage::WaitClientCancelled);
81+
82+
(void)socket_splicer->client_conn().cancel();
83+
84+
// end this execution branch.
85+
return Result::Void;
86+
}
87+
case MysqlRoutingClassicConnectionBase::FromEither::RecvedFromClient: {
88+
// client side sent something
89+
//
90+
// - cancel the server side
91+
// - read from client in ::wait_server_cancelled
92+
stage(Stage::WaitServerCancelled);
93+
94+
(void)socket_splicer->server_conn().cancel();
95+
96+
// end this execution branch.
97+
return Result::Void;
98+
}
99+
case MysqlRoutingClassicConnectionBase::FromEither::None:
100+
case MysqlRoutingClassicConnectionBase::FromEither::Started:
101+
break;
102+
}
103+
104+
harness_assert_this_should_not_execute();
105+
}
106+
107+
stdx::expected<Processor::Result, std::error_code>
108+
AwaitClientOrServerProcessor::wait_server_cancelled() {
109+
stage(Stage::Done);
110+
111+
on_done_(AwaitResult::ClientReadable);
112+
113+
return Result::Again;
114+
}
115+
116+
/**
117+
* read-event from server while waiting for client command.
118+
*
119+
* - either a connection-close by the server or
120+
* - ERR packet before connection-close.
121+
*/
122+
stdx::expected<Processor::Result, std::error_code>
123+
AwaitClientOrServerProcessor::wait_client_cancelled() {
124+
stage(Stage::Done);
125+
126+
on_done_(AwaitResult::ServerReadable);
127+
128+
return Result::Again;
129+
}
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/*
2+
Copyright (c) 2023, Oracle and/or its affiliates.
3+
4+
This program is free software; you can redistribute it and/or modify
5+
it under the terms of the GNU General Public License, version 2.0,
6+
as published by the Free Software Foundation.
7+
8+
This program is also distributed with certain software (including
9+
but not limited to OpenSSL) that is licensed under separate terms,
10+
as designated in a particular file or component or in included license
11+
documentation. The authors of MySQL hereby grant you an additional
12+
permission to link the program and your derivative works with the
13+
separately licensed software that they have included with MySQL.
14+
15+
This program is distributed in the hope that it will be useful,
16+
but WITHOUT ANY WARRANTY; without even the implied warranty of
17+
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18+
GNU General Public License for more details.
19+
20+
You should have received a copy of the GNU General Public License
21+
along with this program; if not, write to the Free Software
22+
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
23+
*/
24+
25+
#ifndef ROUTING_CLASSIC_AWAIT_CLIENT_OR_SERVER_PROCESSOR_INCLUDED
26+
#define ROUTING_CLASSIC_AWAIT_CLIENT_OR_SERVER_PROCESSOR_INCLUDED
27+
28+
#include "processor.h"
29+
30+
class AwaitClientOrServerProcessor : public BasicProcessor {
31+
public:
32+
enum class AwaitResult {
33+
ClientReadable,
34+
ServerReadable,
35+
};
36+
37+
AwaitClientOrServerProcessor(
38+
MysqlRoutingClassicConnectionBase *conn,
39+
std::function<void(stdx::expected<AwaitResult, std::error_code>)> on_done)
40+
: BasicProcessor(conn), on_done_(std::move(on_done)) {}
41+
42+
stdx::expected<Result, std::error_code> process() override;
43+
44+
private:
45+
enum class Stage {
46+
Init,
47+
WaitBoth,
48+
WaitClientCancelled,
49+
WaitServerCancelled,
50+
Done,
51+
};
52+
53+
void stage(Stage stage) { stage_ = stage; }
54+
Stage stage() const { return stage_; }
55+
56+
stdx::expected<Result, std::error_code> init();
57+
stdx::expected<Result, std::error_code> wait_both();
58+
stdx::expected<Result, std::error_code> wait_client_cancelled();
59+
stdx::expected<Result, std::error_code> wait_server_cancelled();
60+
61+
Stage stage_{Stage::Init};
62+
63+
std::function<void(stdx::expected<AwaitResult, std::error_code>)> on_done_;
64+
};
65+
66+
#endif

router/src/routing/src/classic_command.cc

Lines changed: 33 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <memory> // make_unique
2929
#include <string>
3030

31+
#include "await_client_or_server.h"
3132
#include "classic_binlog_dump_forwarder.h"
3233
#include "classic_change_user_forwarder.h"
3334
#include "classic_clone_forwarder.h"
@@ -55,6 +56,7 @@
5556
#include "harness_assert.h"
5657
#include "hexify.h"
5758
#include "mysql/harness/logging/logging.h"
59+
#include "mysql/harness/stdx/expected.h"
5860
#include "mysql/harness/tls_error.h"
5961
#include "mysqld_error.h"
6062
#include "mysqlrouter/connection_pool.h"
@@ -71,10 +73,6 @@ stdx::expected<Processor::Result, std::error_code> CommandProcessor::process() {
7173
return is_authed();
7274
case Stage::WaitBoth:
7375
return wait_both();
74-
case Stage::WaitClientCancelled:
75-
return wait_client_cancelled();
76-
case Stage::WaitServerCancelled:
77-
return wait_server_cancelled();
7876
case Stage::Command:
7977
return command();
8078
case Stage::Done:
@@ -317,74 +315,43 @@ class SelectSessionCollationConnectionHandler : public QuerySender::Handler {
317315
*/
318316
stdx::expected<Processor::Result, std::error_code>
319317
CommandProcessor::wait_both() {
320-
auto *socket_splicer = connection()->socket_splicer();
321-
322-
if (connection()->recv_from_either() ==
323-
MysqlRoutingClassicConnectionBase::FromEither::RecvedFromServer) {
324-
// server side sent something.
325-
//
326-
// - cancel the client side
327-
// - read from server in ::wait_client_cancelled
328-
329-
stage(Stage::WaitClientCancelled);
318+
if (wait_both_result_) {
319+
switch (*wait_both_result_) {
320+
case AwaitClientOrServerProcessor::AwaitResult::ClientReadable:
321+
stage(Stage::Command);
330322

331-
(void)socket_splicer->client_conn().cancel();
323+
return Result::Again;
324+
case AwaitClientOrServerProcessor::AwaitResult::ServerReadable: {
325+
auto *socket_splicer = connection()->socket_splicer();
332326

333-
// end this execution branch.
334-
return Result::Void;
335-
} else if (connection()->recv_from_either() ==
336-
MysqlRoutingClassicConnectionBase::FromEither::RecvedFromClient) {
337-
// client side sent something
338-
//
339-
// - cancel the server side
340-
// - read from client in ::wait_server_cancelled
341-
stage(Stage::WaitServerCancelled);
342-
343-
(void)socket_splicer->server_conn().cancel();
344-
345-
// end this execution branch.
346-
return Result::Void;
347-
}
327+
auto *src_channel = socket_splicer->server_channel();
328+
auto *src_protocol = connection()->server_protocol();
348329

349-
harness_assert_this_should_not_execute();
350-
}
351-
352-
stdx::expected<Processor::Result, std::error_code>
353-
CommandProcessor::wait_server_cancelled() {
354-
stage(Stage::Command);
355-
356-
return Result::Again;
357-
}
330+
auto read_res =
331+
ClassicFrame::ensure_has_msg_prefix(src_channel, src_protocol);
332+
if (!read_res) return recv_server_failed(read_res.error());
358333

359-
/**
360-
* read-event from server while waiting for client command.
361-
*
362-
* - either a connection-close by the server or
363-
* - ERR packet before connection-close.
364-
*/
365-
stdx::expected<Processor::Result, std::error_code>
366-
CommandProcessor::wait_client_cancelled() {
367-
auto *socket_splicer = connection()->socket_splicer();
334+
stage(Stage::Done);
368335

369-
auto dst_channel = socket_splicer->server_channel();
370-
auto dst_protocol = connection()->server_protocol();
336+
if (auto &tr = tracer()) {
337+
tr.trace(Tracer::Event().stage("server::error"));
338+
}
371339

372-
auto read_res =
373-
ClassicFrame::ensure_has_msg_prefix(dst_channel, dst_protocol);
374-
if (!read_res) return recv_server_failed(read_res.error());
340+
// should be a Error packet.
341+
return forward_server_to_client();
342+
}
343+
}
375344

376-
if (auto &tr = tracer()) {
377-
tr.trace(Tracer::Event().stage("server::error"));
345+
harness_assert_this_should_not_execute();
346+
} else {
347+
return stdx::make_unexpected(wait_both_result_.error());
378348
}
379-
380-
// should be a Error packet.
381-
return forward_server_to_client();
382349
}
383350

384351
stdx::expected<Processor::Result, std::error_code> CommandProcessor::command() {
385352
auto *socket_splicer = connection()->socket_splicer();
386-
auto src_channel = socket_splicer->client_channel();
387-
auto src_protocol = connection()->client_protocol();
353+
auto *src_channel = socket_splicer->client_channel();
354+
auto *src_protocol = connection()->client_protocol();
388355
auto &server_conn = socket_splicer->server_conn();
389356

390357
if (connection()->disconnect_requested()) {
@@ -470,12 +437,14 @@ stdx::expected<Processor::Result, std::error_code> CommandProcessor::command() {
470437
//
471438
// watch server-side for connection-close
472439

473-
stage(Stage::WaitBoth);
440+
connection()->push_processor(
441+
std::make_unique<AwaitClientOrServerProcessor>(
442+
connection(),
443+
[this](auto result) { wait_both_result_ = result; }));
474444

475-
connection()->recv_from_either(
476-
MysqlRoutingClassicConnectionBase::FromEither::Started);
445+
stage(Stage::WaitBoth);
477446

478-
return Result::RecvFromBoth;
447+
return Result::Again;
479448
}
480449
}
481450

router/src/routing/src/classic_command.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@
2727

2828
#include "forwarding_processor.h"
2929

30+
#include "await_client_or_server.h"
31+
3032
class CommandProcessor : public ForwardingProcessor {
3133
public:
3234
using ForwardingProcessor::ForwardingProcessor;
3335

3436
enum class Stage {
3537
IsAuthed,
3638
WaitBoth,
37-
WaitClientCancelled,
38-
WaitServerCancelled,
3939
Command,
4040
Done,
4141
};
@@ -48,13 +48,14 @@ class CommandProcessor : public ForwardingProcessor {
4848
private:
4949
stdx::expected<Result, std::error_code> is_authed();
5050
stdx::expected<Result, std::error_code> wait_both();
51-
stdx::expected<Result, std::error_code> wait_client_cancelled();
52-
stdx::expected<Result, std::error_code> wait_server_cancelled();
5351
stdx::expected<Result, std::error_code> command();
5452

5553
void client_idle_timeout();
5654

5755
Stage stage_{Stage::IsAuthed};
56+
57+
stdx::expected<AwaitClientOrServerProcessor::AwaitResult, std::error_code>
58+
wait_both_result_{};
5859
};
5960

6061
#endif

0 commit comments

Comments
 (0)