-
Notifications
You must be signed in to change notification settings - Fork 3
fix: stream connections longer than 5 minutes are dropped #244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f5c907d
d94c4c2
1e780ce
4f7dc9e
300acdc
ed5ff7f
5e28860
3765fcb
24fcdfc
249e147
d8e7b1d
53567e4
346ff3d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,24 +27,30 @@ std::optional<std::string> EntityManager::create(ConfigParams const& params) { | |
} | ||
|
||
if (params.body) { | ||
client_builder.body(std::move(*params.body)); | ||
client_builder.body(*params.body); | ||
} | ||
|
||
if (params.readTimeoutMs) { | ||
client_builder.read_timeout( | ||
std::chrono::milliseconds(*params.readTimeoutMs)); | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This builder option was missing, but it's already implemented. Easy win to speed up tests. |
||
if (params.initialDelayMs) { | ||
client_builder.initial_reconnect_delay( | ||
std::chrono::milliseconds(*params.initialDelayMs)); | ||
} | ||
|
||
client_builder.logger([this](std::string msg) { | ||
LD_LOG(logger_, LogLevel::kDebug) << std::move(msg); | ||
}); | ||
|
||
client_builder.receiver([copy = poster](launchdarkly::sse::Event e) { | ||
copy->post_event(std::move(e)); | ||
client_builder.receiver([copy = poster](launchdarkly::sse::Event event) { | ||
copy->post_event(std::move(event)); | ||
}); | ||
|
||
client_builder.errors( | ||
[copy = poster](launchdarkly::sse::Error e) { copy->post_error(e); }); | ||
client_builder.errors([copy = poster](launchdarkly::sse::Error event) { | ||
copy->post_error(event); | ||
}); | ||
|
||
auto client = client_builder.build(); | ||
if (!client) { | ||
|
@@ -53,7 +59,7 @@ std::optional<std::string> EntityManager::create(ConfigParams const& params) { | |
return std::nullopt; | ||
} | ||
|
||
client->run(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renamed |
||
client->async_connect(); | ||
|
||
entities_.emplace(id, std::make_pair(client, poster)); | ||
return id; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,8 +14,6 @@ auto const kOutboxCapacity = 1023; | |
EventOutbox::EventOutbox(net::any_io_executor executor, | ||
std::string callback_url) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lints. |
||
: callback_url_{std::move(callback_url)}, | ||
callback_port_{}, | ||
callback_host_{}, | ||
callback_counter_{0}, | ||
executor_{executor}, | ||
resolver_{executor}, | ||
|
@@ -29,7 +27,7 @@ EventOutbox::EventOutbox(net::any_io_executor executor, | |
callback_port_ = uri_components->port(); | ||
} | ||
|
||
void EventOutbox::do_shutdown(beast::error_code ec, std::string what) { | ||
void EventOutbox::do_shutdown(beast::error_code ec) { | ||
event_stream_.socket().shutdown(tcp::socket::shutdown_both, ec); | ||
flush_timer_.cancel(); | ||
} | ||
|
@@ -54,20 +52,18 @@ void EventOutbox::run() { | |
|
||
void EventOutbox::stop() { | ||
beast::error_code ec = net::error::basic_errors::operation_aborted; | ||
std::string reason = "stop"; | ||
shutdown_ = true; | ||
net::post(executor_, | ||
beast::bind_front_handler(&EventOutbox::do_shutdown, | ||
shared_from_this(), ec, reason)); | ||
net::post(executor_, beast::bind_front_handler(&EventOutbox::do_shutdown, | ||
shared_from_this(), ec)); | ||
} | ||
|
||
EventOutbox::RequestType EventOutbox::build_request( | ||
std::size_t counter, | ||
std::variant<launchdarkly::sse::Event, launchdarkly::sse::Error> ev) { | ||
std::variant<launchdarkly::sse::Event, launchdarkly::sse::Error> event) { | ||
RequestType req; | ||
|
||
req.set(http::field::host, callback_host_); | ||
req.method(http::verb::get); | ||
req.method(http::verb::post); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
req.target(callback_url_ + "/" + std::to_string(counter)); | ||
|
||
nlohmann::json json; | ||
|
@@ -93,13 +89,15 @@ EventOutbox::RequestType EventOutbox::build_request( | |
break; | ||
case Error::UnrecoverableClientError: | ||
msg.comment = "unrecoverable client error"; | ||
case Error::ReadTimeout: | ||
msg.comment = "read timeout"; | ||
default: | ||
msg.comment = "unspecified error"; | ||
} | ||
json = msg; | ||
} | ||
}, | ||
std::move(ev)); | ||
std::move(event)); | ||
|
||
req.body() = json.dump(); | ||
req.prepare_payload(); | ||
|
@@ -109,7 +107,7 @@ EventOutbox::RequestType EventOutbox::build_request( | |
void EventOutbox::on_resolve(beast::error_code ec, | ||
tcp::resolver::results_type results) { | ||
if (ec) { | ||
return do_shutdown(ec, "resolve"); | ||
return do_shutdown(ec); | ||
} | ||
|
||
beast::get_lowest_layer(event_stream_) | ||
|
@@ -121,7 +119,7 @@ void EventOutbox::on_resolve(beast::error_code ec, | |
void EventOutbox::on_connect(beast::error_code ec, | ||
tcp::resolver::results_type::endpoint_type) { | ||
if (ec) { | ||
return do_shutdown(ec, "connect"); | ||
return do_shutdown(ec); | ||
} | ||
|
||
boost::system::error_code dummy; | ||
|
@@ -131,7 +129,7 @@ void EventOutbox::on_connect(beast::error_code ec, | |
|
||
void EventOutbox::on_flush_timer(boost::system::error_code ec) { | ||
if (ec && shutdown_) { | ||
return do_shutdown(ec, "flush"); | ||
return do_shutdown(ec); | ||
} | ||
|
||
if (!outbox_.empty()) { | ||
|
@@ -154,7 +152,7 @@ void EventOutbox::on_flush_timer(boost::system::error_code ec) { | |
|
||
void EventOutbox::on_write(beast::error_code ec, std::size_t) { | ||
if (ec) { | ||
return do_shutdown(ec, "write"); | ||
return do_shutdown(ec); | ||
} | ||
outbox_.pop(); | ||
on_flush_timer(boost::system::error_code{}); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,7 @@ int main(int argc, char* argv[]) { | |
launchdarkly::Logger logger{ | ||
std::make_unique<ConsoleBackend>("sse-contract-tests")}; | ||
|
||
const std::string default_port = "8123"; | ||
std::string const default_port = "8123"; | ||
std::string port = default_port; | ||
if (argc == 2) { | ||
port = | ||
|
@@ -38,6 +38,7 @@ int main(int argc, char* argv[]) { | |
srv.add_capability("report"); | ||
srv.add_capability("post"); | ||
srv.add_capability("reconnection"); | ||
srv.add_capability("read-timeout"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We were already passing the parameter along, just hadn't advertised the capability. |
||
|
||
net::signal_set signals{ioc, SIGINT, SIGTERM}; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -60,7 +60,9 @@ LoggingBuilder::BasicLogging& LoggingBuilder::BasicLogging::Tag( | |
return *this; | ||
} | ||
LoggingBuilder::BasicLogging::BasicLogging() | ||
: level_(Defaults<AnySDK>::LogLevel()), tag_(Defaults<AnySDK>::LogTag()) {} | ||
: level_(GetLogLevelEnum(std::getenv("LD_LOG_LEVEL"), | ||
Defaults<AnySDK>::LogLevel())), | ||
tag_(Defaults<AnySDK>::LogTag()) {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ideally, we can just run Now it works like you'd want. |
||
|
||
LoggingBuilder::CustomLogging& LoggingBuilder::CustomLogging::Backend( | ||
std::shared_ptr<ILogBackend> backend) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lints.