Skip to content

Commit a4f2d63

Browse files
authored
feat: update sse-contract-tests to use foxy's server_session (#43)
Updates sse-contract-tests to use foxy::server_session and coroutines.
1 parent ec48bba commit a4f2d63

File tree

7 files changed

+152
-264
lines changed

7 files changed

+152
-264
lines changed

apps/sse-contract-tests/CMakeLists.txt

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,14 @@ project(
88
LANGUAGES CXX
99
)
1010

11+
set(Boost_USE_STATIC_LIBS ON)
12+
set(Boost_USE_MULTITHREADED ON)
13+
set(Boost_USE_STATIC_RUNTIME OFF)
14+
find_package(Boost 1.80 REQUIRED COMPONENTS coroutine)
15+
16+
1117
include(${CMAKE_FILES}/json.cmake)
18+
include(${CMAKE_FILES}/foxy.cmake)
1219

1320
add_executable(sse-tests
1421
src/main.cpp
@@ -21,7 +28,9 @@ add_executable(sse-tests
2128
target_link_libraries(sse-tests PRIVATE
2229
launchdarkly::sse
2330
launchdarkly::common
31+
foxy
2432
nlohmann_json::nlohmann_json
33+
Boost::coroutine
2534
)
2635

2736
target_include_directories(sse-tests PUBLIC include)

apps/sse-contract-tests/include/server.hpp

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,44 +7,42 @@
77
#include "entity_manager.hpp"
88
#include "logger.hpp"
99

10+
#include <foxy/listener.hpp>
11+
1012
namespace net = boost::asio; // from <boost/asio.hpp>
1113

1214
#include <vector>
1315

1416
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
1517

16-
class server : public std::enable_shared_from_this<server> {
17-
net::io_context& ioc_;
18-
tcp::acceptor acceptor_;
18+
class server {
19+
foxy::listener listener_;
1920
EntityManager entity_manager_;
2021
std::vector<std::string> caps_;
2122
launchdarkly::Logger& logger_;
2223

2324
public:
2425
/**
2526
* Constructs a server, which stands up a REST API at the given
26-
* port and address.
27+
* port and address. The server is ready to accept connections upon
28+
* construction.
2729
* @param ioc IO context.
2830
* @param address Address to bind.
2931
* @param port Port to bind.
3032
* @param logger Logger.
3133
*/
3234
server(net::io_context& ioc,
3335
std::string const& address,
34-
std::string const& port,
36+
unsigned short port,
3537
launchdarkly::Logger& logger);
3638
/**
3739
* Advertise an optional test-harness capability, such as "comments".
3840
* @param cap
3941
*/
4042
void add_capability(std::string cap);
43+
4144
/**
42-
* Begins an async operation to start accepting requests.
45+
* Shuts down the server.
4346
*/
44-
void run();
45-
46-
private:
47-
void do_accept();
48-
void on_accept(boost::system::error_code const& ec, tcp::socket socket);
49-
void fail(boost::beast::error_code ec, char const* what);
47+
void shutdown();
5048
};

apps/sse-contract-tests/include/session.hpp

Lines changed: 66 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -5,65 +5,90 @@
55

66
#include <boost/beast.hpp>
77
#include <boost/beast/http.hpp>
8+
#include <foxy/server_session.hpp>
89
#include <vector>
910

11+
#include <boost/asio/yield.hpp>
12+
1013
namespace beast = boost::beast; // from <boost/beast.hpp>
1114
namespace http = beast::http; // from <boost/beast/http.hpp>
1215
namespace net = boost::asio; // from <boost/asio.hpp>
1316
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp>
1417

15-
class Session : public std::enable_shared_from_this<Session> {
16-
beast::tcp_stream stream_;
17-
beast::flat_buffer buffer_{8192};
18-
http::request<http::string_body> request_;
19-
EntityManager& manager_;
20-
std::vector<std::string> capabilities_;
21-
std::function<void()> on_shutdown_cb_;
22-
bool shutdown_requested_;
23-
launchdarkly::Logger& logger_;
24-
18+
class Session : boost::asio::coroutine {
2519
public:
20+
using Request = http::request<http::string_body>;
21+
using Response = http::response<http::string_body>;
22+
23+
struct Frame {
24+
Request request_;
25+
Response resp_;
26+
};
27+
2628
/**
2729
* Constructs a session, which provides a REST API.
28-
* @param socket Connected socket.
30+
* @param session The HTTP session.
2931
* @param manager Manager through which entities can be created/destroyed.
3032
* @param caps Test service capabilities to advertise.
3133
* @param logger Logger.
3234
*/
33-
explicit Session(tcp::socket&& socket,
34-
EntityManager& manager,
35-
std::vector<std::string> caps,
36-
launchdarkly::Logger& logger);
35+
Session(foxy::server_session& session,
36+
EntityManager& manager,
37+
std::vector<std::string>& caps,
38+
launchdarkly::Logger& logger);
3739

38-
~Session();
40+
template <class Self>
41+
auto operator()(Self& self,
42+
boost::system::error_code ec = {},
43+
std::size_t const bytes_transferred = 0) -> void {
44+
using launchdarkly::LogLevel;
45+
auto& f = *frame_;
3946

40-
/**
41-
* Set a callback to be invoked when a REST client requests shutdown.
42-
*/
43-
template <typename Callback>
44-
void on_shutdown(Callback cb) {
45-
on_shutdown_cb_ = cb;
46-
}
47-
/**
48-
* Begin waiting for requests.
49-
*/
50-
void start();
51-
/**
52-
* Stop waiting for requests and close the session.
53-
*/
54-
void stop();
47+
reenter(*this) {
48+
while (true) {
49+
f.resp_ = {};
50+
f.request_ = {};
5551

56-
private:
57-
http::message_generator handle_request(
58-
http::request<http::string_body>&& req);
59-
void do_read();
52+
yield session_.async_read(f.request_, std::move(self));
53+
if (ec) {
54+
LD_LOG(logger_, LogLevel::kWarn)
55+
<< "session: read: " << ec.what();
56+
break;
57+
}
58+
59+
if (auto response = generate_response(f.request_)) {
60+
f.resp_ = *response;
61+
} else {
62+
LD_LOG(logger_, LogLevel::kWarn)
63+
<< "session: shutdown requested by client";
64+
std::exit(0);
65+
}
6066

61-
void do_stop(char const* reason);
62-
void on_read(beast::error_code ec, std::size_t bytes_transferred);
67+
yield session_.async_write(f.resp_, std::move(self));
6368

64-
void send_response(http::message_generator&& msg);
69+
if (ec) {
70+
LD_LOG(logger_, LogLevel::kWarn)
71+
<< "session: write: " << ec.what();
72+
break;
73+
}
74+
75+
if (!f.request_.keep_alive()) {
76+
break;
77+
}
78+
}
79+
80+
return self.complete({}, 0);
81+
}
82+
}
6583

66-
void on_write(bool keep_alive,
67-
beast::error_code ec,
68-
std::size_t bytes_transferred);
84+
std::optional<Response> generate_response(Request& req);
85+
86+
private:
87+
foxy::server_session& session_;
88+
std::unique_ptr<Frame> frame_;
89+
EntityManager& manager_;
90+
std::vector<std::string>& caps_;
91+
launchdarkly::Logger& logger_;
6992
};
93+
94+
#include <boost/asio/unyield.hpp>

apps/sse-contract-tests/src/main.cpp

Lines changed: 24 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44

55
#include <boost/asio/io_context.hpp>
66
#include <boost/asio/signal_set.hpp>
7+
#include <boost/asio/spawn.hpp>
78
#include <boost/beast.hpp>
9+
#include <boost/lexical_cast.hpp>
810

911
#include <memory>
1012

@@ -19,31 +21,43 @@ int main(int argc, char* argv[]) {
1921
launchdarkly::Logger logger{
2022
std::make_unique<ConsoleBackend>("sse-contract-tests")};
2123

22-
std::string port = "8123";
24+
const std::string default_port = "8123";
25+
std::string port = default_port;
2326
if (argc == 2) {
2427
port =
2528
argv[1]; // NOLINT(cppcoreguidelines-pro-bounds-pointer-arithmetic)
2629
}
30+
2731
try {
2832
net::io_context ioc{1};
2933

30-
auto srv = std::make_shared<server>(ioc, "0.0.0.0", port, logger);
31-
srv->add_capability("headers");
32-
srv->add_capability("comments");
33-
srv->add_capability("report");
34-
srv->add_capability("post");
35-
srv->add_capability("read-timeout");
36-
srv->run();
34+
auto p = boost::lexical_cast<unsigned short>(port);
35+
server srv(ioc, "0.0.0.0", p, logger);
36+
37+
srv.add_capability("headers");
38+
srv.add_capability("comments");
39+
srv.add_capability("report");
40+
srv.add_capability("post");
41+
srv.add_capability("read-timeout");
3742

3843
net::signal_set signals{ioc, SIGINT, SIGTERM};
39-
signals.async_wait([&](beast::error_code const&, int) {
44+
45+
boost::asio::spawn(ioc.get_executor(), [&](auto yield) mutable {
46+
signals.async_wait(yield);
4047
LD_LOG(logger, LogLevel::kInfo) << "shutting down..";
41-
ioc.stop();
48+
srv.shutdown();
4249
});
4350

4451
ioc.run();
4552
LD_LOG(logger, LogLevel::kInfo) << "bye!";
4653

54+
} catch (boost::bad_lexical_cast&) {
55+
LD_LOG(logger, LogLevel::kError)
56+
<< "invalid port (" << port
57+
<< "), provide a number (no arguments defaults "
58+
"to port "
59+
<< default_port << ")";
60+
return EXIT_FAILURE;
4761
} catch (std::exception const& e) {
4862
LD_LOG(logger, LogLevel::kError) << e.what();
4963
return EXIT_FAILURE;

apps/sse-contract-tests/src/server.cpp

Lines changed: 9 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "session.hpp"
33

44
#include <boost/asio/dispatch.hpp>
5+
#include <boost/asio/ip/address.hpp>
56
#include <boost/asio/placeholders.hpp>
67
#include <boost/asio/strand.hpp>
78
#include <boost/bind.hpp>
@@ -11,49 +12,17 @@ using launchdarkly::LogLevel;
1112

1213
server::server(net::io_context& ioc,
1314
std::string const& address,
14-
std::string const& port,
15+
unsigned short port,
1516
launchdarkly::Logger& logger)
16-
: ioc_{ioc},
17-
acceptor_{ioc},
17+
: listener_{ioc.get_executor(),
18+
tcp::endpoint(boost::asio::ip::make_address(address), port)},
1819
entity_manager_{ioc.get_executor(), logger},
19-
caps_{},
2020
logger_{logger} {
21-
beast::error_code ec;
22-
23-
tcp::resolver resolver{ioc_};
24-
tcp::endpoint endpoint = *resolver.resolve(address, port, ec).begin();
25-
if (ec) {
26-
fail(ec, "resolve");
27-
return;
28-
}
29-
acceptor_.open(endpoint.protocol(), ec);
30-
if (ec) {
31-
fail(ec, "open");
32-
return;
33-
}
34-
acceptor_.set_option(tcp::acceptor::reuse_address(true), ec);
35-
if (ec) {
36-
fail(ec, "set_option");
37-
return;
38-
}
39-
acceptor_.bind(endpoint, ec);
40-
if (ec) {
41-
fail(ec, "bind");
42-
return;
43-
}
44-
acceptor_.listen(net::socket_base::max_listen_connections, ec);
45-
if (ec) {
46-
fail(ec, "listen");
47-
return;
48-
}
49-
5021
LD_LOG(logger_, LogLevel::kInfo)
5122
<< "server: listening on " << address << ":" << port;
52-
}
53-
54-
void server::fail(beast::error_code ec, char const* what) {
55-
LD_LOG(logger_, LogLevel::kError)
56-
<< "server: " << what << ": " << ec.message();
23+
listener_.async_accept([this](auto& server) {
24+
return Session(server, entity_manager_, caps_, logger_);
25+
});
5726
}
5827

5928
void server::add_capability(std::string cap) {
@@ -62,39 +31,6 @@ void server::add_capability(std::string cap) {
6231
caps_.push_back(std::move(cap));
6332
}
6433

65-
void server::run() {
66-
LD_LOG(logger_, LogLevel::kDebug) << "server: run requested";
67-
net::dispatch(
68-
acceptor_.get_executor(),
69-
beast::bind_front_handler(&server::do_accept, shared_from_this()));
70-
}
71-
72-
void server::do_accept() {
73-
LD_LOG(logger_, LogLevel::kDebug) << "server: waiting for connection";
74-
acceptor_.async_accept(
75-
net::make_strand(ioc_),
76-
beast::bind_front_handler(&server::on_accept, shared_from_this()));
77-
}
78-
79-
void server::on_accept(boost::system::error_code const& ec,
80-
tcp::socket socket) {
81-
if (!acceptor_.is_open()) {
82-
return;
83-
}
84-
if (ec) {
85-
fail(ec, "accept");
86-
return;
87-
}
88-
89-
auto session = std::make_shared<Session>(std::move(socket), entity_manager_,
90-
caps_, logger_);
91-
92-
session->on_shutdown([this]() {
93-
LD_LOG(logger_, LogLevel::kDebug) << "server: terminating";
94-
ioc_.stop();
95-
});
96-
97-
session->start();
98-
99-
do_accept();
34+
void server::shutdown() {
35+
listener_.shutdown();
10036
}

0 commit comments

Comments
 (0)