Skip to content

Commit be59e19

Browse files
authored
feat: automatic retries in SSE client (#101)
Adds support for automatic retries to the SSE client. I've removed the `skip "reconnection"` suppression from the CI tests.
1 parent 5d08380 commit be59e19

File tree

4 files changed

+174
-56
lines changed

4 files changed

+174
-56
lines changed

.github/workflows/sse.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,4 @@ jobs:
3636
with:
3737
repo: 'sse-contract-tests'
3838
test_service_port: ${{ env.TEST_SERVICE_PORT }}
39-
extra_params: '-skip HTTP -skip reconnection'
39+
extra_params: '-skip HTTP'

apps/sse-contract-tests/src/main.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@ int main(int argc, char* argv[]) {
3030
try {
3131
net::io_context ioc{1};
3232

33-
auto p = boost::lexical_cast<unsigned short>(port);
34-
server srv(ioc, "0.0.0.0", p, logger);
33+
server srv(ioc, "0.0.0.0", boost::lexical_cast<unsigned short>(port),
34+
logger);
3535

3636
srv.add_capability("headers");
3737
srv.add_capability("comments");
3838
srv.add_capability("report");
3939
srv.add_capability("post");
40-
srv.add_capability("read-timeout");
40+
srv.add_capability("reconnection");
4141

4242
net::signal_set signals{ioc, SIGINT, SIGTERM};
4343

libs/server-sent-events/src/client.cpp

Lines changed: 162 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
#include <boost/asio/ip/tcp.hpp>
2+
#include <boost/asio/steady_timer.hpp>
23
#include <boost/asio/strand.hpp>
34
#include <boost/asio/use_future.hpp>
45
#include <foxy/client_session.hpp>
56
#include <launchdarkly/sse/client.hpp>
67

8+
#include "backoff.hpp"
79
#include "parser.hpp"
810

911
#include <boost/beast/http/parser.hpp>
@@ -16,6 +18,7 @@
1618
#include <chrono>
1719
#include <iostream>
1820
#include <memory>
21+
#include <sstream>
1922

2023
namespace launchdarkly::sse {
2124

@@ -40,6 +43,11 @@ static boost::optional<net::ssl::context&> ToOptRef(
4043

4144
class FoxyClient : public Client,
4245
public std::enable_shared_from_this<FoxyClient> {
46+
private:
47+
using cb = std::function<void(launchdarkly::sse::Event)>;
48+
using body = launchdarkly::sse::detail::EventBody<cb>;
49+
using response = http::response<body>;
50+
4351
public:
4452
FoxyClient(boost::asio::any_io_executor executor,
4553
http::request<http::string_body> req,
@@ -62,16 +70,57 @@ class FoxyClient : public Client,
6270
launchdarkly::foxy::session_opts{
6371
ToOptRef(ssl_context_),
6472
connect_timeout.value_or(kNoTimeout)}),
73+
backoff_(std::chrono::seconds(1), std::chrono::seconds(30)),
74+
last_event_id_(std::nullopt),
75+
backoff_timer_(session_.get_executor()),
76+
event_receiver_(std::move(receiver)),
6577
logger_(std::move(logger)) {
66-
// SSE body will never end unless an error occurs, so we shouldn't set a
67-
// size limit.
68-
body_parser_.body_limit(boost::none);
69-
body_parser_.get().body().on_event(std::move(receiver));
78+
create_parser();
7079
}
7180

72-
void fail(boost::system::error_code ec, std::string const& what) {
73-
logger_("sse-client: " + what + ": " + ec.message());
74-
async_shutdown(nullptr);
81+
/** The body parser is recreated each time a connection is made because its
82+
* internal state cannot be explicitly reset.
83+
*
84+
* Since SSE body will never end unless
85+
* an error occurs, the body size limit must be removed.
86+
*/
87+
void create_parser() {
88+
body_parser_.emplace();
89+
body_parser_->body_limit(boost::none);
90+
body_parser_->get().body().on_event(event_receiver_);
91+
}
92+
93+
/**
94+
* Called whenever the connection needs to be reattempted, triggering
95+
* a timed wait for the current backoff duration.
96+
*
97+
* The body parser's last SSE event ID must be cached so it can be added
98+
* as a header on the next request (since the parser is destroyed.)
99+
*/
100+
void do_backoff(std::string const& reason) {
101+
backoff_.fail();
102+
103+
std::stringstream msg;
104+
msg << "backing off in ("
105+
<< std::chrono::duration_cast<std::chrono::seconds>(
106+
backoff_.delay())
107+
.count()
108+
<< ") seconds due to " << reason;
109+
110+
logger_(msg.str());
111+
112+
last_event_id_ = body_parser_->get().body().last_event_id();
113+
create_parser();
114+
backoff_timer_.expires_from_now(backoff_.delay());
115+
backoff_timer_.async_wait(beast::bind_front_handler(
116+
&FoxyClient::on_backoff, shared_from_this()));
117+
}
118+
119+
void on_backoff(boost::system::error_code ec) {
120+
if (ec == boost::asio::error::operation_aborted) {
121+
return;
122+
}
123+
run();
75124
}
76125

77126
void run() override {
@@ -81,24 +130,19 @@ class FoxyClient : public Client,
81130
shared_from_this()));
82131
}
83132

84-
void async_shutdown(std::function<void()> completion) override {
85-
session_.async_shutdown(beast::bind_front_handler(
86-
&FoxyClient::on_shutdown, std::move(completion)));
87-
}
88-
89-
static void on_shutdown(std::function<void()> completion,
90-
boost::system::error_code ec) {
91-
boost::ignore_unused(ec);
92-
if (completion) {
93-
completion();
94-
}
95-
}
96-
97133
void on_connect(boost::system::error_code ec) {
134+
if (ec == boost::asio::error::operation_aborted) {
135+
return;
136+
}
98137
if (ec) {
99-
return fail(ec, "connect");
138+
return do_backoff(ec.what());
100139
}
101140

141+
if (last_event_id_ && !last_event_id_->empty()) {
142+
req_.set("last-event-id", *last_event_id_);
143+
} else {
144+
req_.erase("last-event-id");
145+
}
102146
session_.opts.timeout = write_timeout_.value_or(kNoTimeout);
103147
session_.async_write(req_,
104148
beast::bind_front_handler(&FoxyClient::on_write,
@@ -107,46 +151,117 @@ class FoxyClient : public Client,
107151

108152
void on_write(boost::system::error_code ec, std::size_t amount) {
109153
boost::ignore_unused(amount);
154+
if (ec == boost::asio::error::operation_aborted) {
155+
return;
156+
}
110157
if (ec) {
111-
return fail(ec, "send request");
158+
return do_backoff(ec.what());
112159
}
160+
113161
session_.opts.timeout = read_timeout_.value_or(kNoTimeout);
114162
session_.async_read_header(
115-
body_parser_, beast::bind_front_handler(&FoxyClient::on_headers,
116-
shared_from_this()));
163+
*body_parser_, beast::bind_front_handler(&FoxyClient::on_headers,
164+
shared_from_this()));
117165
}
118166

119167
void on_headers(boost::system::error_code ec, std::size_t amount) {
120168
boost::ignore_unused(amount);
169+
if (ec == boost::asio::error::operation_aborted) {
170+
return;
171+
}
121172
if (ec) {
122-
return fail(ec, "read header");
173+
return do_backoff(ec.what());
123174
}
124175

125-
if (!body_parser_.is_header_done()) {
126-
session_.async_read_header(
127-
body_parser_, beast::bind_front_handler(&FoxyClient::on_headers,
128-
shared_from_this()));
129-
return;
176+
if (!body_parser_->is_header_done()) {
177+
/* keep reading headers */
178+
return session_.async_read_header(
179+
*body_parser_,
180+
beast::bind_front_handler(&FoxyClient::on_headers,
181+
shared_from_this()));
130182
}
131183

132-
auto response = body_parser_.get();
133-
if (beast::http::to_status_class(response.result()) ==
134-
beast::http::status_class::successful) {
135-
session_.async_read(body_parser_, beast::bind_front_handler(
136-
&FoxyClient::on_read_complete,
137-
shared_from_this()));
138-
} else {
139-
return fail(ec, "read response");
184+
/* headers are finished, body is ready */
185+
auto response = body_parser_->get();
186+
auto status_class = beast::http::to_status_class(response.result());
187+
188+
if (status_class == beast::http::status_class::successful) {
189+
if (!correct_content_type(response)) {
190+
return do_backoff("invalid Content-Type");
191+
}
192+
193+
backoff_.succeed();
194+
return session_.async_read(
195+
*body_parser_,
196+
beast::bind_front_handler(&FoxyClient::on_read_body,
197+
shared_from_this()));
198+
}
199+
200+
if (status_class == beast::http::status_class::client_error) {
201+
if (recoverable_client_error(response.result())) {
202+
return do_backoff(backoff_reason(response.result()));
203+
}
204+
205+
// TODO: error callback
206+
207+
return;
140208
}
209+
210+
do_backoff(backoff_reason(response.result()));
211+
}
212+
213+
static std::string backoff_reason(beast::http::status status) {
214+
std::stringstream ss;
215+
ss << "HTTP status " << int(status) << " (" << status << ")";
216+
return ss.str();
141217
}
142218

143-
void on_read_complete(boost::system::error_code ec, std::size_t amount) {
219+
void on_read_body(boost::system::error_code ec, std::size_t amount) {
144220
boost::ignore_unused(amount);
145221
if (ec == boost::asio::error::operation_aborted) {
146-
async_shutdown(nullptr);
147-
} else {
148-
return fail(ec, "read body");
222+
return;
223+
}
224+
do_backoff(ec.what());
225+
}
226+
227+
void async_shutdown(std::function<void()> completion) override {
228+
boost::asio::post(session_.get_executor(),
229+
beast::bind_front_handler(&FoxyClient::do_shutdown,
230+
shared_from_this(),
231+
std::move(completion)));
232+
}
233+
234+
void do_shutdown(std::function<void()> completion) {
235+
session_.async_shutdown(beast::bind_front_handler(
236+
&FoxyClient::on_shutdown, std::move(completion)));
237+
}
238+
239+
static void on_shutdown(std::function<void()> completion,
240+
boost::system::error_code ec) {
241+
boost::ignore_unused(ec);
242+
if (completion) {
243+
completion();
244+
}
245+
}
246+
247+
void fail(boost::system::error_code ec, std::string const& what) {
248+
logger_("sse-client: " + what + ": " + ec.message());
249+
async_shutdown(nullptr);
250+
}
251+
252+
static bool recoverable_client_error(beast::http::status status) {
253+
return (status == beast::http::status::bad_request ||
254+
status == beast::http::status::request_timeout ||
255+
status == beast::http::status::too_many_requests);
256+
}
257+
258+
static bool correct_content_type(FoxyClient::response const& response) {
259+
if (auto content_type = response.find("content-type");
260+
content_type != response.end()) {
261+
return content_type->value().find("text/event-stream") !=
262+
content_type->value().npos;
149263
}
264+
return false;
150265
}
151266

152267
private:
@@ -157,10 +272,12 @@ class FoxyClient : public Client,
157272
std::optional<std::chrono::milliseconds> read_timeout_;
158273
std::optional<std::chrono::milliseconds> write_timeout_;
159274
http::request<http::string_body> req_;
160-
using cb = std::function<void(launchdarkly::sse::Event)>;
161-
using body = launchdarkly::sse::detail::EventBody<cb>;
162-
http::response_parser<body> body_parser_;
275+
Builder::EventReceiver event_receiver_;
276+
std::optional<http::response_parser<body> > body_parser_;
163277
launchdarkly::foxy::client_session session_;
278+
std::optional<std::string> last_event_id_;
279+
Backoff backoff_;
280+
boost::asio::steady_timer backoff_timer_;
164281
Builder::LogCallback logger_;
165282
};
166283

libs/server-sent-events/src/parser.hpp

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,11 @@ class EventBody<EventReceiver>::value_type {
4343

4444
EventReceiver events_;
4545

46+
std::optional<std::string> last_event_id_;
47+
4648
public:
47-
void on_event(EventReceiver&& receiver) { events_ = std::move(receiver); }
49+
void on_event(EventReceiver receiver) { events_ = std::move(receiver); }
50+
std::optional<std::string> last_event_id() const { return last_event_id_; }
4851
};
4952

5053
template <class EventReceiver>
@@ -54,7 +57,7 @@ struct EventBody<EventReceiver>::reader {
5457
std::optional<std::string> buffered_line_;
5558
std::deque<std::string> complete_lines_;
5659
bool begin_CR_;
57-
std::optional<std::string> last_event_id_;
60+
5861
std::optional<Event> event_;
5962

6063
public:
@@ -64,7 +67,6 @@ struct EventBody<EventReceiver>::reader {
6467
buffered_line_(),
6568
complete_lines_(),
6669
begin_CR_(false),
67-
last_event_id_(),
6870
event_() {
6971
boost::ignore_unused(h);
7072
}
@@ -80,7 +82,6 @@ struct EventBody<EventReceiver>::reader {
8082
void init(boost::optional<std::uint64_t> const& content_length,
8183
error_code& ec) {
8284
boost::ignore_unused(content_length);
83-
8485
// The specification requires this to indicate "no error"
8586
ec = {};
8687
}
@@ -207,7 +208,7 @@ struct EventBody<EventReceiver>::reader {
207208

208209
if (!event_.has_value()) {
209210
event_.emplace(Event{});
210-
event_->id = last_event_id_;
211+
event_->id = body_.last_event_id_;
211212
}
212213

213214
if (field.first == "event") {
@@ -220,8 +221,8 @@ struct EventBody<EventReceiver>::reader {
220221
// ignored.
221222
continue;
222223
}
223-
last_event_id_ = field.second;
224-
event_->id = last_event_id_;
224+
body_.last_event_id_ = field.second;
225+
event_->id = body_.last_event_id_;
225226
} else if (field.first == "retry") {
226227
// todo: implement
227228
}

0 commit comments

Comments
 (0)