Skip to content

Commit 1e780ce

Browse files
committed
fix sse behavior when reconnecting to streams
1 parent d94c4c2 commit 1e780ce

File tree

1 file changed

+34
-21
lines changed

1 file changed

+34
-21
lines changed

libs/server-sent-events/src/client.cpp

Lines changed: 34 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -79,20 +79,20 @@ class FoxyClient : public Client,
7979
read_timeout_(read_timeout),
8080
write_timeout_(write_timeout),
8181
req_(std::move(req)),
82-
session_(std::move(executor),
83-
launchdarkly::foxy::session_opts{
84-
ToOptRef(ssl_context_),
85-
connect_timeout.value_or(kNoTimeout)}),
8682
backoff_(
8783
initial_reconnect_delay.value_or(kDefaultInitialReconnectDelay),
8884
kDefaultMaxBackoffDelay),
8985
last_event_id_(std::nullopt),
90-
backoff_timer_(session_.get_executor()),
86+
backoff_timer_(std::move(executor)),
9187
event_receiver_(std::move(receiver)),
9288
logger_(std::move(logger)),
9389
errors_(std::move(errors)),
94-
last_read_(std::nullopt) {
90+
last_read_(std::nullopt),
91+
shutting_down_(false),
92+
body_parser_(std::nullopt),
93+
session_(std::nullopt) {
9594
create_parser();
95+
create_session();
9696
}
9797

9898
/** Logs a message indicating that an async_read_some operation
@@ -132,6 +132,13 @@ class FoxyClient : public Client,
132132
body_parser_->get().body().on_event(event_receiver_);
133133
}
134134

135+
void create_session() {
136+
session_.emplace(
137+
backoff_timer_.get_executor(),
138+
launchdarkly::foxy::session_opts{
139+
ToOptRef(ssl_context_), connect_timeout_.value_or(kNoTimeout)});
140+
}
141+
135142
/**
136143
* Called whenever the connection needs to be reattempted, triggering
137144
* a timed wait for the current backoff duration.
@@ -158,6 +165,8 @@ class FoxyClient : public Client,
158165
logger_(msg.str());
159166

160167
create_parser();
168+
create_session();
169+
161170
backoff_timer_.expires_from_now(backoff_.delay());
162171
backoff_timer_.async_wait(beast::bind_front_handler(
163172
&FoxyClient::on_backoff, shared_from_this()));
@@ -171,7 +180,7 @@ class FoxyClient : public Client,
171180
}
172181

173182
void run() override {
174-
session_.async_connect(
183+
session_->async_connect(
175184
host_, port_,
176185
beast::bind_front_handler(&FoxyClient::on_connect,
177186
shared_from_this()));
@@ -190,10 +199,10 @@ class FoxyClient : public Client,
190199
} else {
191200
req_.erase("last-event-id");
192201
}
193-
session_.opts.timeout = write_timeout_.value_or(kNoTimeout);
194-
session_.async_write(req_,
195-
beast::bind_front_handler(&FoxyClient::on_write,
196-
shared_from_this()));
202+
session_->opts.timeout = write_timeout_.value_or(kNoTimeout);
203+
session_->async_write(req_,
204+
beast::bind_front_handler(&FoxyClient::on_write,
205+
shared_from_this()));
197206
}
198207

199208
void on_write(boost::system::error_code ec, std::size_t amount) {
@@ -205,8 +214,8 @@ class FoxyClient : public Client,
205214
return do_backoff(ec.what());
206215
}
207216

208-
session_.opts.timeout = read_timeout_.value_or(kNoTimeout);
209-
session_.async_read_header(
217+
session_->opts.timeout = read_timeout_.value_or(kNoTimeout);
218+
session_->async_read_header(
210219
*body_parser_, beast::bind_front_handler(&FoxyClient::on_headers,
211220
shared_from_this()));
212221
}
@@ -222,7 +231,7 @@ class FoxyClient : public Client,
222231

223232
if (!body_parser_->is_header_done()) {
224233
/* keep reading headers */
225-
return session_.async_read_header(
234+
return session_->async_read_header(
226235
*body_parser_,
227236
beast::bind_front_handler(&FoxyClient::on_headers,
228237
shared_from_this()));
@@ -244,7 +253,7 @@ class FoxyClient : public Client,
244253
backoff_.succeed();
245254

246255
last_read_ = std::chrono::steady_clock::now();
247-
return session_.async_read_some(
256+
return session_->async_read_some(
248257
*body_parser_,
249258
beast::bind_front_handler(&FoxyClient::on_read_body,
250259
shared_from_this()));
@@ -290,13 +299,13 @@ class FoxyClient : public Client,
290299

291300
void on_read_body(boost::system::error_code ec, std::size_t amount) {
292301
boost::ignore_unused(amount);
293-
if (ec == boost::asio::error::operation_aborted) {
294-
logger_("read HTTP response body aborted");
302+
if (ec == boost::asio::error::operation_aborted && shutting_down_) {
303+
logger_("read HTTP response body aborted (shutting down)");
295304
return;
296305
}
297306
if (!ec) {
298307
log_and_update_last_read(amount);
299-
return session_.async_read_some(
308+
return session_->async_read_some(
300309
*body_parser_,
301310
beast::bind_front_handler(&FoxyClient::on_read_body,
302311
shared_from_this()));
@@ -305,14 +314,16 @@ class FoxyClient : public Client,
305314
}
306315

307316
void async_shutdown(std::function<void()> completion) override {
308-
boost::asio::post(session_.get_executor(),
317+
boost::asio::post(session_->get_executor(),
309318
beast::bind_front_handler(&FoxyClient::do_shutdown,
310319
shared_from_this(),
311320
std::move(completion)));
312321
}
313322

314323
void do_shutdown(std::function<void()> completion) {
315-
session_.async_shutdown(beast::bind_front_handler(
324+
shutting_down_ = true;
325+
backoff_timer_.cancel();
326+
session_->async_shutdown(beast::bind_front_handler(
316327
&FoxyClient::on_shutdown, std::move(completion)));
317328
}
318329

@@ -381,12 +392,14 @@ class FoxyClient : public Client,
381392
Builder::ErrorCallback errors_;
382393

383394
std::optional<http::response_parser<body>> body_parser_;
384-
launchdarkly::foxy::client_session session_;
395+
std::optional<launchdarkly::foxy::client_session> session_;
385396
std::optional<std::string> last_event_id_;
386397
Backoff backoff_;
387398
boost::asio::steady_timer backoff_timer_;
388399

389400
std::optional<std::chrono::steady_clock::time_point> last_read_;
401+
402+
bool shutting_down_;
390403
};
391404

392405
Builder::Builder(net::any_io_executor ctx, std::string url)

0 commit comments

Comments
 (0)