Skip to content

Commit ab2b0fe

Browse files
authored
feat: eventsource client (#1)
Implements an eventsource client. The client does not yet pass redirect/retry contract tests.
1 parent c02c70c commit ab2b0fe

35 files changed

+1862
-555
lines changed

CMakeLists.txt

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,20 @@ project(
1111
LANGUAGES CXX C
1212
)
1313

14+
if (${CMAKE_VERSION} VERSION_GREATER_EQUAL "3.24")
15+
# Affects robustness of timestamp checking on FetchContent dependencies.
16+
cmake_policy(SET CMP0135 NEW)
17+
endif ()
18+
19+
# All projects in this repo should share the same version of 3rd party depends.
20+
# It's the only way to remain sane.
21+
set(CMAKE_FILES "${CMAKE_CURRENT_SOURCE_DIR}/cmake")
1422
set(CMAKE_CXX_STANDARD 17)
1523

24+
1625
option(BUILD_TESTING "Enable C++ unit tests." ON)
1726

18-
if(BUILD_TESTING)
27+
if (BUILD_TESTING)
1928
include(FetchContent)
2029
FetchContent_Declare(
2130
googletest
@@ -26,7 +35,7 @@ if(BUILD_TESTING)
2635
FetchContent_MakeAvailable(googletest)
2736

2837
enable_testing()
29-
endif()
38+
endif ()
3039

3140

3241
add_subdirectory(libs/common)

apps/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1+
#add_subdirectory(hello-c)
2+
add_subdirectory(sse-contract-tests)
13
add_subdirectory(hello-cpp)

apps/hello-cpp/main.cpp

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1-
#include <iostream>
21
#include <launchdarkly/api.hpp>
3-
#include <launchdarkly/sse/sse.hpp>
4-
#include <thread>
2+
#include <launchdarkly/sse/client.hpp>
3+
4+
#include <boost/asio/io_context.hpp>
5+
56
#include "console_backend.hpp"
67
#include "logger.hpp"
78

9+
#include <iostream>
10+
#include <utility>
11+
812
namespace net = boost::asio; // from <boost/asio.hpp>
913

1014
using launchdarkly::ConsoleBackend;
@@ -22,20 +26,30 @@ int main() {
2226

2327
net::io_context ioc;
2428

25-
// curl "https://stream-stg.launchdarkly.com/all?filter=even-flags-2" -H
26-
// "Authorization: sdk-66a5dbe0-8b26-445a-9313-761e7e3d381b" -v
29+
char const* key = std::getenv("STG_SDK_KEY");
30+
if (!key) {
31+
std::cout << "Set environment variable STG_SDK_KEY to the sdk key\n";
32+
return 1;
33+
}
2734
auto client =
28-
launchdarkly::sse::builder(ioc,
35+
launchdarkly::sse::Builder(ioc.get_executor(),
2936
"https://stream-stg.launchdarkly.com/all")
30-
.header("Authorization", "sdk-66a5dbe0-8b26-445a-9313-761e7e3d381b")
37+
.header("Authorization", key)
38+
.receiver([&](launchdarkly::sse::Event ev) {
39+
LD_LOG(logger, LogLevel::kInfo) << "event: " << ev.type();
40+
LD_LOG(logger, LogLevel::kInfo)
41+
<< "data: " << std::move(ev).take();
42+
})
43+
.logger([&](std::string msg) {
44+
LD_LOG(logger, LogLevel::kDebug) << std::move(msg);
45+
})
3146
.build();
3247

3348
if (!client) {
3449
LD_LOG(logger, LogLevel::kError) << "Failed to build client";
3550
return 1;
3651
}
3752

38-
std::thread t([&]() { ioc.run(); });
39-
4053
client->run();
54+
ioc.run();
4155
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Required for Apple Silicon support.
2+
cmake_minimum_required(VERSION 3.19)
3+
4+
project(
5+
LaunchDarklyCPPSSETestHarness
6+
VERSION 0.1
7+
DESCRIPTION "LaunchDarkly CPP SSE Test Harness"
8+
LANGUAGES CXX
9+
)
10+
11+
include(${CMAKE_FILES}/json.cmake)
12+
13+
add_executable(sse-tests
14+
src/main.cpp
15+
src/server.cpp
16+
src/entity_manager.cpp
17+
src/session.cpp
18+
src/event_outbox.cpp
19+
)
20+
21+
target_link_libraries(sse-tests PRIVATE
22+
launchdarkly::sse
23+
launchdarkly::common
24+
nlohmann_json::nlohmann_json
25+
)
26+
27+
target_include_directories(sse-tests PUBLIC include)
28+
29+
#add_definitions(-DBOOST_ASIO_ENABLE_HANDLER_TRACKING)

apps/sse-contract-tests/README.md

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
## SSE contract tests
2+
3+
Contract tests have a "test service" on one side, and the "test harness" on
4+
the other.
5+
6+
This project implements the test service for the C++ EventSource client.
7+
8+
**session (session.hpp)**
9+
10+
This provides a simple REST API for creating/destroying
11+
test entities. Examples:
12+
13+
`GET /` - returns the capabilities of this service.
14+
15+
`DELETE /` - shutdown the service.
16+
17+
`POST /` - create a new test entity, and return its ID.
18+
19+
`DELETE /entity/1` - delete the an entity identified by `1`.
20+
21+
**entity manager (entity_manager.hpp)**
22+
23+
This manages "entities" - the combination of an SSE client, and an outbox that posts events _received_ from the stream
24+
_back to_ the test harness.
25+
26+
The point is to allow the test harness to assert that events were parsed and dispatched as expected.
27+
28+
**event outbox (event_outbox.hpp)**
29+
30+
The 2nd half of an "entity". It receives events from the SSE client, pushes them into a queue,
31+
and then periodically flushes the queue out to the test harness.
32+
33+
**definitions (definitions.hpp)**
34+
35+
Contains JSON definitions that are used to communicate with the test harness.
36+
37+
**server (server.hpp)**
38+
39+
Glues everything together, mainly providing the TCP acceptor that spawns new sessions.
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
#pragma once
2+
3+
#include <launchdarkly/sse/event.hpp>
4+
#include <optional>
5+
#include <string>
6+
#include <unordered_map>
7+
#include "nlohmann/json.hpp"
8+
9+
namespace nlohmann {
10+
11+
template <typename T>
12+
struct adl_serializer<std::optional<T>> {
13+
static void to_json(json& j, std::optional<T> const& opt) {
14+
if (opt == std::nullopt) {
15+
j = nullptr;
16+
} else {
17+
j = *opt; // this will call adl_serializer<T>::to_json which will
18+
// find the free function to_json in T's namespace!
19+
}
20+
}
21+
22+
static void from_json(json const& j, std::optional<T>& opt) {
23+
if (j.is_null()) {
24+
opt = std::nullopt;
25+
} else {
26+
opt = j.get<T>(); // same as above, but with
27+
// adl_serializer<T>::from_json
28+
}
29+
}
30+
};
31+
} // namespace nlohmann
32+
33+
// Represents the initial JSON configuration sent by the test harness.
34+
struct ConfigParams {
35+
std::string streamUrl;
36+
std::string callbackUrl;
37+
std::string tag;
38+
std::optional<uint32_t> initialDelayMs;
39+
std::optional<uint32_t> readTimeoutMs;
40+
std::optional<std::string> lastEventId;
41+
std::optional<std::unordered_map<std::string, std::string>> headers;
42+
std::optional<std::string> method;
43+
std::optional<std::string> body;
44+
};
45+
46+
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(ConfigParams,
47+
streamUrl,
48+
callbackUrl,
49+
tag,
50+
initialDelayMs,
51+
readTimeoutMs,
52+
lastEventId,
53+
headers,
54+
method,
55+
body);
56+
57+
// Represents an event payload that this service posts back
58+
// to the test harness. The events are originally received by this server
59+
// via the SSE stream; they are posted back so the test harness can verify
60+
// that we parsed and dispatched them successfully.
61+
struct Event {
62+
std::string type;
63+
std::string id;
64+
std::string data;
65+
Event() = default;
66+
explicit Event(launchdarkly::sse::Event event)
67+
: type(event.type()),
68+
id(event.id().value_or("")),
69+
data(std::move(event).take()) {}
70+
};
71+
72+
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(Event, type, data, id);
73+
74+
struct EventMessage {
75+
std::string kind;
76+
Event event;
77+
};
78+
79+
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(EventMessage, kind, event);
80+
81+
struct CommentMessage {
82+
std::string kind;
83+
std::string comment;
84+
};
85+
86+
NLOHMANN_DEFINE_TYPE_NON_INTRUSIVE_WITH_DEFAULT(CommentMessage, kind, comment);
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#pragma once
2+
3+
#include "definitions.hpp"
4+
#include "logger.hpp"
5+
6+
#include <launchdarkly/sse/client.hpp>
7+
8+
#include <boost/asio/any_io_executor.hpp>
9+
10+
#include <memory>
11+
#include <mutex>
12+
#include <optional>
13+
#include <string>
14+
#include <unordered_map>
15+
16+
class EventOutbox;
17+
18+
class EntityManager {
19+
using Inbox = std::shared_ptr<launchdarkly::sse::Client>;
20+
using Outbox = std::shared_ptr<EventOutbox>;
21+
using Entity = std::pair<Inbox, Outbox>;
22+
23+
std::unordered_map<std::string, Entity> entities_;
24+
25+
std::size_t counter_;
26+
boost::asio::any_io_executor executor_;
27+
28+
launchdarkly::Logger& logger_;
29+
30+
public:
31+
/**
32+
* Create an entity manager, which can be used to create and destroy
33+
* entities (SSE clients + event channel back to test harness).
34+
* @param executor Executor.
35+
* @param logger Logger.
36+
*/
37+
EntityManager(boost::asio::any_io_executor executor,
38+
launchdarkly::Logger& logger);
39+
/**
40+
* Create an entity with the given configuration.
41+
* @param params Config of the entity.
42+
* @return An ID representing the entity, or none if the entity couldn't
43+
* be created.
44+
*/
45+
std::optional<std::string> create(ConfigParams params);
46+
/**
47+
* Destroy an entity with the given ID.
48+
* @param id ID of the entity.
49+
* @return True if the entity was found and destroyed.
50+
*/
51+
bool destroy(std::string const& id);
52+
};
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#pragma once
2+
3+
#include "entity_manager.hpp"
4+
5+
#include <launchdarkly/sse/client.hpp>
6+
7+
#include <boost/asio/deadline_timer.hpp>
8+
#include <boost/asio/ip/tcp.hpp>
9+
#include <boost/beast/core/tcp_stream.hpp>
10+
#include <boost/beast/http.hpp>
11+
#include <boost/lockfree/spsc_queue.hpp>
12+
13+
#include <memory>
14+
#include <string>
15+
16+
namespace beast = boost::beast;
17+
namespace http = beast::http;
18+
namespace net = boost::asio;
19+
using tcp = boost::asio::ip::tcp;
20+
21+
class EventOutbox : public std::enable_shared_from_this<EventOutbox> {
22+
using RequestType = http::request<http::string_body>;
23+
24+
std::string callback_url_;
25+
std::string callback_port_;
26+
std::string callback_host_;
27+
size_t callback_counter_;
28+
29+
net::any_io_executor executor_;
30+
tcp::resolver resolver_;
31+
beast::tcp_stream event_stream_;
32+
33+
boost::lockfree::spsc_queue<RequestType> outbox_;
34+
35+
net::deadline_timer flush_timer_;
36+
std::string id_;
37+
38+
bool shutdown_;
39+
40+
public:
41+
/**
42+
* Instantiate an outbox; events will be posted to the given URL.
43+
* @param executor Executor.
44+
* @param callback_url Target URL.
45+
*/
46+
EventOutbox(net::any_io_executor executor, std::string callback_url);
47+
/**
48+
* Enqueues an event, which will be posted to the server
49+
* later.
50+
* @param event Event to post.
51+
*/
52+
void post_event(launchdarkly::sse::Event event);
53+
/**
54+
* Begins an async operation to connect to the server.
55+
*/
56+
void run();
57+
/**
58+
* Begins an async operation to disconnect from the server.
59+
*/
60+
void stop();
61+
62+
private:
63+
RequestType build_request(std::size_t counter, launchdarkly::sse::Event ev);
64+
void on_resolve(beast::error_code ec, tcp::resolver::results_type results);
65+
void on_connect(beast::error_code ec,
66+
tcp::resolver::results_type::endpoint_type);
67+
void on_flush_timer(boost::system::error_code ec);
68+
void on_write(beast::error_code ec, std::size_t);
69+
void do_shutdown(beast::error_code ec, std::string what);
70+
};

0 commit comments

Comments
 (0)