Skip to content

Commit 54ce7f9

Browse files
authored
feat: follow redirects in SSE client (#104)
This PR allows the SSE client to follow temporary/permanent redirects. Going from http to https is not going to work because the initial client wouldn't have the ssl context, but this should cover the basic case. I've also run this revision 164 times through contract tests and didn't observe a crash yet.
1 parent 75070a6 commit 54ce7f9

File tree

8 files changed

+160
-30
lines changed

8 files changed

+160
-30
lines changed

.github/workflows/sse.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,4 +36,3 @@ jobs:
3636
with:
3737
repo: 'sse-contract-tests'
3838
test_service_port: ${{ env.TEST_SERVICE_PORT }}
39-
extra_params: '-skip HTTP'

apps/sse-contract-tests/include/definitions.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,10 @@ struct CommentMessage {
8484
};
8585

8686
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(CommentMessage, kind, comment);
87+
88+
struct ErrorMessage {
89+
std::string kind;
90+
std::string comment;
91+
};
92+
93+
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(ErrorMessage, kind, comment);

apps/sse-contract-tests/include/event_outbox.hpp

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
#include <memory>
1414
#include <string>
15+
#include <variant>
1516

1617
namespace beast = boost::beast;
1718
namespace http = beast::http;
@@ -44,23 +45,34 @@ class EventOutbox : public std::enable_shared_from_this<EventOutbox> {
4445
* @param callback_url Target URL.
4546
*/
4647
EventOutbox(net::any_io_executor executor, std::string callback_url);
48+
4749
/**
48-
* Enqueues an event, which will be posted to the server
50+
* Queues an event, which will be posted to the server
4951
* later.
5052
* @param event Event to post.
5153
*/
5254
void post_event(launchdarkly::sse::Event event);
55+
56+
/**
57+
* Queues an error, which will be posted to the server later.
58+
* @param error Error to post.
59+
*/
60+
void post_error(launchdarkly::sse::Error error);
61+
5362
/**
5463
* Begins an async operation to connect to the server.
5564
*/
5665
void run();
66+
5767
/**
5868
* Begins an async operation to disconnect from the server.
5969
*/
6070
void stop();
6171

6272
private:
63-
RequestType build_request(std::size_t counter, launchdarkly::sse::Event ev);
73+
RequestType build_request(
74+
std::size_t counter,
75+
std::variant<launchdarkly::sse::Event, launchdarkly::sse::Error> ev);
6476
void on_resolve(beast::error_code ec, tcp::resolver::results_type results);
6577
void on_connect(beast::error_code ec,
6678
tcp::resolver::results_type::endpoint_type);

apps/sse-contract-tests/src/entity_manager.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ std::optional<std::string> EntityManager::create(ConfigParams const& params) {
4343
copy->post_event(std::move(e));
4444
});
4545

46+
client_builder.errors(
47+
[copy = poster](launchdarkly::sse::Error e) { copy->post_error(e); });
48+
4649
auto client = client_builder.build();
4750
if (!client) {
4851
LD_LOG(logger_, LogLevel::kWarn)

apps/sse-contract-tests/src/event_outbox.cpp

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ void EventOutbox::post_event(launchdarkly::sse::Event event) {
4040
flush_timer_.expires_from_now(boost::posix_time::milliseconds(0));
4141
}
4242

43+
void EventOutbox::post_error(launchdarkly::sse::Error error) {
44+
auto http_request = build_request(callback_counter_++, error);
45+
outbox_.push(http_request);
46+
flush_timer_.expires_from_now(boost::posix_time::milliseconds(0));
47+
}
48+
4349
void EventOutbox::run() {
4450
resolver_.async_resolve(callback_host_, callback_port_,
4551
beast::bind_front_handler(&EventOutbox::on_resolve,
@@ -57,7 +63,7 @@ void EventOutbox::stop() {
5763

5864
EventOutbox::RequestType EventOutbox::build_request(
5965
std::size_t counter,
60-
launchdarkly::sse::Event ev) {
66+
std::variant<launchdarkly::sse::Event, launchdarkly::sse::Error> ev) {
6167
RequestType req;
6268

6369
req.set(http::field::host, callback_host_);
@@ -66,11 +72,34 @@ EventOutbox::RequestType EventOutbox::build_request(
6672

6773
nlohmann::json json;
6874

69-
if (ev.type() == "comment") {
70-
json = CommentMessage{"comment", std::move(ev).take()};
71-
} else {
72-
json = EventMessage{"event", Event{ev}};
73-
}
75+
std::visit(
76+
[&](auto&& arg) {
77+
using T = std::decay_t<decltype(arg)>;
78+
if constexpr (std::is_same_v<T, launchdarkly::sse::Event>) {
79+
if (arg.type() == "comment") {
80+
json = CommentMessage{"comment", std::move(arg).take()};
81+
} else {
82+
json = EventMessage{"event", Event{std::move(arg)}};
83+
}
84+
} else if constexpr (std::is_same_v<T, launchdarkly::sse::Error>) {
85+
using launchdarkly::sse::Error;
86+
auto msg = ErrorMessage{"error"};
87+
switch (arg) {
88+
case Error::NoContent:
89+
msg.comment = "no content";
90+
break;
91+
case Error::InvalidRedirectLocation:
92+
msg.comment = "invalid redirect location";
93+
break;
94+
case Error::UnrecoverableClientError:
95+
msg.comment = "unrecoverable client error";
96+
default:
97+
msg.comment = "unspecified error";
98+
}
99+
json = msg;
100+
}
101+
},
102+
std::move(ev));
74103

75104
req.body() = json.dump();
76105
req.prepare_payload();

libs/server-sent-events/include/launchdarkly/sse/client.hpp

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#pragma once
22

3+
#include <launchdarkly/sse/error.hpp>
34
#include <launchdarkly/sse/event.hpp>
45

56
#include <boost/asio/any_io_executor.hpp>
@@ -29,8 +30,9 @@ class Client;
2930
*/
3031
class Builder {
3132
public:
32-
using EventReceiver = std::function<void(launchdarkly::sse::Event)>;
33+
using EventReceiver = std::function<void(Event)>;
3334
using LogCallback = std::function<void(std::string)>;
35+
using ErrorCallback = std::function<void(Error)>;
3436

3537
/**
3638
* Create a builder for the given URL. If the port is omitted, 443 is
@@ -114,6 +116,13 @@ class Builder {
114116
*/
115117
Builder& logger(LogCallback callback);
116118

119+
/**
120+
* Specify an error reporting callback for the Client.
121+
* @param callback Callback to receive an error from the Client.
122+
* @return Reference to this builder.
123+
*/
124+
Builder& errors(ErrorCallback callback);
125+
117126
/**
118127
* Builds a Client. The shared pointer is necessary to extend the lifetime
119128
* of the Client to encompass each asynchronous operation that it performs.
@@ -131,6 +140,7 @@ class Builder {
131140
std::optional<std::chrono::milliseconds> connect_timeout_;
132141
LogCallback logging_cb_;
133142
EventReceiver receiver_;
143+
ErrorCallback error_cb_;
134144
};
135145

136146
/**
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
#pragma once
2+
3+
namespace launchdarkly::sse {
4+
5+
enum class Error {
6+
NoContent = 1,
7+
InvalidRedirectLocation = 2,
8+
UnrecoverableClientError = 3,
9+
};
10+
}

libs/server-sent-events/src/client.cpp

Lines changed: 80 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
#include <boost/beast/version.hpp>
1515

1616
#include <boost/url/parse.hpp>
17-
17+
#include <boost/url/url.hpp>
1818
#include <chrono>
1919
#include <iostream>
2020
#include <memory>
@@ -58,6 +58,7 @@ class FoxyClient : public Client,
5858
std::optional<std::chrono::milliseconds> write_timeout,
5959
Builder::EventReceiver receiver,
6060
Builder::LogCallback logger,
61+
Builder::ErrorCallback errors,
6162
std::optional<net::ssl::context> maybe_ssl)
6263
: ssl_context_(std::move(maybe_ssl)),
6364
host_(std::move(host)),
@@ -74,12 +75,13 @@ class FoxyClient : public Client,
7475
last_event_id_(std::nullopt),
7576
backoff_timer_(session_.get_executor()),
7677
event_receiver_(std::move(receiver)),
77-
logger_(std::move(logger)) {
78+
logger_(std::move(logger)),
79+
errors_(std::move(errors)) {
7880
create_parser();
7981
}
8082

81-
/** The body parser is recreated each time a connection is made because its
82-
* internal state cannot be explicitly reset.
83+
/** The body parser is recreated each time a connection is made because
84+
* its internal state cannot be explicitly reset.
8385
*
8486
* Since SSE body will never end unless
8587
* an error occurs, the body size limit must be removed.
@@ -100,6 +102,12 @@ class FoxyClient : public Client,
100102
void do_backoff(std::string const& reason) {
101103
backoff_.fail();
102104

105+
if (auto id = body_parser_->get().body().last_event_id()) {
106+
if (!id->empty()) {
107+
last_event_id_ = id;
108+
}
109+
}
110+
103111
std::stringstream msg;
104112
msg << "backing off in ("
105113
<< std::chrono::duration_cast<std::chrono::seconds>(
@@ -109,7 +117,6 @@ class FoxyClient : public Client,
109117

110118
logger_(msg.str());
111119

112-
last_event_id_ = body_parser_->get().body().last_event_id();
113120
create_parser();
114121
backoff_timer_.expires_from_now(backoff_.delay());
115122
backoff_timer_.async_wait(beast::bind_front_handler(
@@ -138,7 +145,7 @@ class FoxyClient : public Client,
138145
return do_backoff(ec.what());
139146
}
140147

141-
if (last_event_id_ && !last_event_id_->empty()) {
148+
if (last_event_id_) {
142149
req_.set("last-event-id", *last_event_id_);
143150
} else {
144151
req_.erase("last-event-id");
@@ -186,6 +193,10 @@ class FoxyClient : public Client,
186193
auto status_class = beast::http::to_status_class(response.result());
187194

188195
if (status_class == beast::http::status_class::successful) {
196+
if (response.result() == beast::http::status::no_content) {
197+
errors_(Error::NoContent);
198+
return;
199+
}
189200
if (!correct_content_type(response)) {
190201
return do_backoff("invalid Content-Type");
191202
}
@@ -197,13 +208,30 @@ class FoxyClient : public Client,
197208
shared_from_this()));
198209
}
199210

211+
if (status_class == beast::http::status_class::redirection) {
212+
if (can_redirect(response)) {
213+
auto new_url =
214+
redirect_url("base", response.find("location")->value());
215+
216+
if (!new_url) {
217+
errors_(Error::InvalidRedirectLocation);
218+
return;
219+
}
220+
221+
req_.set(http::field::host, new_url->host());
222+
req_.target(new_url->encoded_target());
223+
} else {
224+
errors_(Error::InvalidRedirectLocation);
225+
return;
226+
}
227+
}
228+
200229
if (status_class == beast::http::status_class::client_error) {
201230
if (recoverable_client_error(response.result())) {
202231
return do_backoff(backoff_reason(response.result()));
203232
}
204233

205-
// TODO: error callback
206-
234+
errors_(Error::UnrecoverableClientError);
207235
return;
208236
}
209237

@@ -244,11 +272,6 @@ class FoxyClient : public Client,
244272
}
245273
}
246274

247-
void fail(boost::system::error_code ec, std::string const& what) {
248-
logger_("sse-client: " + what + ": " + ec.message());
249-
async_shutdown(nullptr);
250-
}
251-
252275
static bool recoverable_client_error(beast::http::status status) {
253276
return (status == beast::http::status::bad_request ||
254277
status == beast::http::status::request_timeout ||
@@ -264,21 +287,52 @@ class FoxyClient : public Client,
264287
return false;
265288
}
266289

290+
static bool can_redirect(FoxyClient::response const& response) {
291+
return (response.result() == beast::http::status::moved_permanently ||
292+
response.result() == beast::http::status::temporary_redirect) &&
293+
response.find("location") != response.end();
294+
}
295+
296+
static std::optional<boost::urls::url> redirect_url(
297+
std::string orig_base,
298+
std::string orig_location) {
299+
auto location = boost::urls::parse_uri(orig_location);
300+
if (!location) {
301+
return std::nullopt;
302+
}
303+
if (location->has_scheme()) {
304+
return location.value();
305+
}
306+
307+
boost::urls::url base(orig_base);
308+
auto result = base.resolve(*location);
309+
if (!result) {
310+
return std::nullopt;
311+
}
312+
313+
return base;
314+
}
315+
267316
private:
268317
std::optional<net::ssl::context> ssl_context_;
269318
std::string host_;
270319
std::string port_;
320+
271321
std::optional<std::chrono::milliseconds> connect_timeout_;
272322
std::optional<std::chrono::milliseconds> read_timeout_;
273323
std::optional<std::chrono::milliseconds> write_timeout_;
324+
274325
http::request<http::string_body> req_;
326+
275327
Builder::EventReceiver event_receiver_;
276-
std::optional<http::response_parser<body> > body_parser_;
328+
Builder::LogCallback logger_;
329+
Builder::ErrorCallback errors_;
330+
331+
std::optional<http::response_parser<body>> body_parser_;
277332
launchdarkly::foxy::client_session session_;
278333
std::optional<std::string> last_event_id_;
279334
Backoff backoff_;
280335
boost::asio::steady_timer backoff_timer_;
281-
Builder::LogCallback logger_;
282336
};
283337

284338
Builder::Builder(net::any_io_executor ctx, std::string url)
@@ -287,9 +341,9 @@ Builder::Builder(net::any_io_executor ctx, std::string url)
287341
read_timeout_{std::nullopt},
288342
write_timeout_{std::nullopt},
289343
connect_timeout_{std::nullopt},
290-
logging_cb_([](auto msg) {}) {
291-
receiver_ = [](launchdarkly::sse::Event const&) {};
292-
344+
logging_cb_([](auto msg) {}),
345+
receiver_([](launchdarkly::sse::Event const&) {}),
346+
error_cb_([](auto err) {}) {
293347
request_.version(11);
294348
request_.set(http::field::user_agent, kDefaultUserAgent);
295349
request_.method(http::verb::get);
@@ -332,11 +386,16 @@ Builder& Builder::receiver(EventReceiver receiver) {
332386
return *this;
333387
}
334388

335-
Builder& Builder::logger(std::function<void(std::string)> callback) {
389+
Builder& Builder::logger(LogCallback callback) {
336390
logging_cb_ = std::move(callback);
337391
return *this;
338392
}
339393

394+
Builder& Builder::errors(ErrorCallback callback) {
395+
error_cb_ = std::move(callback);
396+
return *this;
397+
}
398+
340399
std::shared_ptr<Client> Builder::build() {
341400
auto uri_components = boost::urls::parse_uri(url_);
342401
if (!uri_components) {
@@ -376,7 +435,8 @@ std::shared_ptr<Client> Builder::build() {
376435

377436
return std::make_shared<FoxyClient>(
378437
net::make_strand(executor_), request, host, service, connect_timeout_,
379-
read_timeout_, write_timeout_, receiver_, logging_cb_, std::move(ssl));
438+
read_timeout_, write_timeout_, receiver_, logging_cb_, error_cb_,
439+
std::move(ssl));
380440
}
381441

382442
} // namespace launchdarkly::sse

0 commit comments

Comments
 (0)