Skip to content

Commit e12664f

Browse files
authored
fix: stream connections longer than 5 minutes are dropped (#244)
This fixes [streaming connection dropping after 5 minutes](#243). The problem was applying a 5 minute timeout to an `async_read` _operation_, when the actual intent was to apply it to the act of receiving _any data_. The timeout exists to prevent the client from hanging infinitely if the server stops responding. LaunchDarkly sends heartbeats to clear the timer. With `async_read`, the operation won't complete until the response body is finished. Since the body will never finish until the client shuts down (or server interrupts it), it's not possible to institute a timeout. The solution is to use `async_read_some`, which completes whenever data arrives. This way, we can assign the 5 minute timeout as an upper bound on receiving any data, as was originally intended. ----- A side effect of this change was breaking the `reconnection` SSE contract tests, which proved to be somewhat of a rabbithole. The upshot is that we weren't handling chunked encoding properly when chunked encoding _ends_. The original code assumes we either terminate an async read with an error, or it goes on forever. In fact it should perform the backoff algorithm if we detect the end of chunked encoding.
1 parent f74260d commit e12664f

File tree

11 files changed

+232
-66
lines changed

11 files changed

+232
-66
lines changed

contract-tests/sse-contract-tests/include/event_outbox.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,11 @@ class EventOutbox : public std::enable_shared_from_this<EventOutbox> {
7272
private:
7373
RequestType build_request(
7474
std::size_t counter,
75-
std::variant<launchdarkly::sse::Event, launchdarkly::sse::Error> ev);
75+
std::variant<launchdarkly::sse::Event, launchdarkly::sse::Error> event);
7676
void on_resolve(beast::error_code ec, tcp::resolver::results_type results);
7777
void on_connect(beast::error_code ec,
7878
tcp::resolver::results_type::endpoint_type);
7979
void on_flush_timer(boost::system::error_code ec);
8080
void on_write(beast::error_code ec, std::size_t);
81-
void do_shutdown(beast::error_code ec, std::string what);
81+
void do_shutdown(beast::error_code ec);
8282
};

contract-tests/sse-contract-tests/src/entity_manager.cpp

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,24 +27,30 @@ std::optional<std::string> EntityManager::create(ConfigParams const& params) {
2727
}
2828

2929
if (params.body) {
30-
client_builder.body(std::move(*params.body));
30+
client_builder.body(*params.body);
3131
}
3232

3333
if (params.readTimeoutMs) {
3434
client_builder.read_timeout(
3535
std::chrono::milliseconds(*params.readTimeoutMs));
3636
}
3737

38+
if (params.initialDelayMs) {
39+
client_builder.initial_reconnect_delay(
40+
std::chrono::milliseconds(*params.initialDelayMs));
41+
}
42+
3843
client_builder.logger([this](std::string msg) {
3944
LD_LOG(logger_, LogLevel::kDebug) << std::move(msg);
4045
});
4146

42-
client_builder.receiver([copy = poster](launchdarkly::sse::Event e) {
43-
copy->post_event(std::move(e));
47+
client_builder.receiver([copy = poster](launchdarkly::sse::Event event) {
48+
copy->post_event(std::move(event));
4449
});
4550

46-
client_builder.errors(
47-
[copy = poster](launchdarkly::sse::Error e) { copy->post_error(e); });
51+
client_builder.errors([copy = poster](launchdarkly::sse::Error event) {
52+
copy->post_error(event);
53+
});
4854

4955
auto client = client_builder.build();
5056
if (!client) {
@@ -53,7 +59,7 @@ std::optional<std::string> EntityManager::create(ConfigParams const& params) {
5359
return std::nullopt;
5460
}
5561

56-
client->run();
62+
client->async_connect();
5763

5864
entities_.emplace(id, std::make_pair(client, poster));
5965
return id;

contract-tests/sse-contract-tests/src/event_outbox.cpp

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ auto const kOutboxCapacity = 1023;
1414
EventOutbox::EventOutbox(net::any_io_executor executor,
1515
std::string callback_url)
1616
: callback_url_{std::move(callback_url)},
17-
callback_port_{},
18-
callback_host_{},
1917
callback_counter_{0},
2018
executor_{executor},
2119
resolver_{executor},
@@ -29,7 +27,7 @@ EventOutbox::EventOutbox(net::any_io_executor executor,
2927
callback_port_ = uri_components->port();
3028
}
3129

32-
void EventOutbox::do_shutdown(beast::error_code ec, std::string what) {
30+
void EventOutbox::do_shutdown(beast::error_code ec) {
3331
event_stream_.socket().shutdown(tcp::socket::shutdown_both, ec);
3432
flush_timer_.cancel();
3533
}
@@ -54,20 +52,18 @@ void EventOutbox::run() {
5452

5553
void EventOutbox::stop() {
5654
beast::error_code ec = net::error::basic_errors::operation_aborted;
57-
std::string reason = "stop";
5855
shutdown_ = true;
59-
net::post(executor_,
60-
beast::bind_front_handler(&EventOutbox::do_shutdown,
61-
shared_from_this(), ec, reason));
56+
net::post(executor_, beast::bind_front_handler(&EventOutbox::do_shutdown,
57+
shared_from_this(), ec));
6258
}
6359

6460
EventOutbox::RequestType EventOutbox::build_request(
6561
std::size_t counter,
66-
std::variant<launchdarkly::sse::Event, launchdarkly::sse::Error> ev) {
62+
std::variant<launchdarkly::sse::Event, launchdarkly::sse::Error> event) {
6763
RequestType req;
6864

6965
req.set(http::field::host, callback_host_);
70-
req.method(http::verb::get);
66+
req.method(http::verb::post);
7167
req.target(callback_url_ + "/" + std::to_string(counter));
7268

7369
nlohmann::json json;
@@ -93,13 +89,15 @@ EventOutbox::RequestType EventOutbox::build_request(
9389
break;
9490
case Error::UnrecoverableClientError:
9591
msg.comment = "unrecoverable client error";
92+
case Error::ReadTimeout:
93+
msg.comment = "read timeout";
9694
default:
9795
msg.comment = "unspecified error";
9896
}
9997
json = msg;
10098
}
10199
},
102-
std::move(ev));
100+
std::move(event));
103101

104102
req.body() = json.dump();
105103
req.prepare_payload();
@@ -109,7 +107,7 @@ EventOutbox::RequestType EventOutbox::build_request(
109107
void EventOutbox::on_resolve(beast::error_code ec,
110108
tcp::resolver::results_type results) {
111109
if (ec) {
112-
return do_shutdown(ec, "resolve");
110+
return do_shutdown(ec);
113111
}
114112

115113
beast::get_lowest_layer(event_stream_)
@@ -121,7 +119,7 @@ void EventOutbox::on_resolve(beast::error_code ec,
121119
void EventOutbox::on_connect(beast::error_code ec,
122120
tcp::resolver::results_type::endpoint_type) {
123121
if (ec) {
124-
return do_shutdown(ec, "connect");
122+
return do_shutdown(ec);
125123
}
126124

127125
boost::system::error_code dummy;
@@ -131,7 +129,7 @@ void EventOutbox::on_connect(beast::error_code ec,
131129

132130
void EventOutbox::on_flush_timer(boost::system::error_code ec) {
133131
if (ec && shutdown_) {
134-
return do_shutdown(ec, "flush");
132+
return do_shutdown(ec);
135133
}
136134

137135
if (!outbox_.empty()) {
@@ -154,7 +152,7 @@ void EventOutbox::on_flush_timer(boost::system::error_code ec) {
154152

155153
void EventOutbox::on_write(beast::error_code ec, std::size_t) {
156154
if (ec) {
157-
return do_shutdown(ec, "write");
155+
return do_shutdown(ec);
158156
}
159157
outbox_.pop();
160158
on_flush_timer(boost::system::error_code{});

contract-tests/sse-contract-tests/src/main.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ int main(int argc, char* argv[]) {
2020
launchdarkly::Logger logger{
2121
std::make_unique<ConsoleBackend>("sse-contract-tests")};
2222

23-
const std::string default_port = "8123";
23+
std::string const default_port = "8123";
2424
std::string port = default_port;
2525
if (argc == 2) {
2626
port =
@@ -38,6 +38,7 @@ int main(int argc, char* argv[]) {
3838
srv.add_capability("report");
3939
srv.add_capability("post");
4040
srv.add_capability("reconnection");
41+
srv.add_capability("read-timeout");
4142

4243
net::signal_set signals{ioc, SIGINT, SIGTERM};
4344

libs/client-sdk/src/data_sources/streaming_data_source.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ static char const* DataSourceErrorToString(launchdarkly::sse::Error error) {
2727
return "server responded with an invalid redirection";
2828
case sse::Error::UnrecoverableClientError:
2929
return "unrecoverable client-side error";
30+
case sse::Error::ReadTimeout:
31+
return "read timeout reached";
3032
}
3133
}
3234

@@ -138,7 +140,7 @@ void StreamingDataSource::Start() {
138140

139141
client_builder.logger([weak_self](auto msg) {
140142
if (auto self = weak_self.lock()) {
141-
LD_LOG(self->logger_, LogLevel::kDebug) << msg;
143+
LD_LOG(self->logger_, LogLevel::kDebug) << "sse-client: " << msg;
142144
}
143145
});
144146

@@ -163,7 +165,7 @@ void StreamingDataSource::Start() {
163165
kCouldNotParseEndpoint);
164166
return;
165167
}
166-
client_->run();
168+
client_->async_connect();
167169
}
168170

169171
void StreamingDataSource::ShutdownAsync(std::function<void()> completion) {

libs/common/src/config/logging_builder.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,9 @@ LoggingBuilder::BasicLogging& LoggingBuilder::BasicLogging::Tag(
6060
return *this;
6161
}
6262
LoggingBuilder::BasicLogging::BasicLogging()
63-
: level_(Defaults<AnySDK>::LogLevel()), tag_(Defaults<AnySDK>::LogTag()) {}
63+
: level_(GetLogLevelEnum(std::getenv("LD_LOG_LEVEL"),
64+
Defaults<AnySDK>::LogLevel())),
65+
tag_(Defaults<AnySDK>::LogTag()) {}
6466

6567
LoggingBuilder::CustomLogging& LoggingBuilder::CustomLogging::Backend(
6668
std::shared_ptr<ILogBackend> backend) {

libs/server-sent-events/include/launchdarkly/sse/client.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ class Builder {
160160
class Client {
161161
public:
162162
virtual ~Client() = default;
163-
virtual void run() = 0;
163+
virtual void async_connect() = 0;
164164
virtual void async_shutdown(std::function<void()> completion) = 0;
165165
};
166166

libs/server-sent-events/include/launchdarkly/sse/error.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ enum class Error {
66
NoContent = 1,
77
InvalidRedirectLocation = 2,
88
UnrecoverableClientError = 3,
9+
ReadTimeout = 4,
910
};
1011
}

0 commit comments

Comments
 (0)