Skip to content

Commit 9931b96

Browse files
feat: Implement streaming data source. (#17)
Co-authored-by: Casey Waldren <[email protected]>
1 parent c72034c commit 9931b96

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

54 files changed

+1596
-196
lines changed

apps/hello-cpp/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
#include <launchdarkly/api.hpp>
21
#include <launchdarkly/sse/client.hpp>
2+
#include "launchdarkly/client_side/api.hpp"
33

44
#include <boost/asio/io_context.hpp>
55

bindings/c/src/api.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
#include "launchdarkly/client_side/api.hpp"
12
#include <launchdarkly/api.h>
2-
#include <launchdarkly/api.hpp>
33

44
bool launchdarkly_foo(int32_t* out_result) {
55
if (auto val = launchdarkly::foo()) {
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#pragma once
2+
3+
namespace launchdarkly::client_side {
4+
5+
class IDataSource {
6+
public:
7+
virtual void start() = 0;
8+
virtual void close() = 0;
9+
10+
virtual ~IDataSource() = default;
11+
IDataSource(IDataSource const& item) = delete;
12+
IDataSource(IDataSource&& item) = delete;
13+
IDataSource& operator=(IDataSource const&) = delete;
14+
IDataSource& operator=(IDataSource&&) = delete;
15+
16+
protected:
17+
IDataSource() = default;
18+
};
19+
20+
} // namespace launchdarkly::client_side
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
#pragma once
2+
3+
#include <map>
4+
#include <string>
5+
6+
#include "config/detail/service_endpoints.hpp"
7+
#include "context.hpp"
8+
#include "data/evaluation_result.hpp"
9+
10+
namespace launchdarkly::client_side {
11+
12+
/**
13+
* An item descriptor is an abstraction that allows for Flag data to be
14+
* handled using the same type in both a put or a patch.
15+
*/
16+
struct ItemDescriptor {
17+
/**
18+
* The version number of this data, provided by the SDK.
19+
*/
20+
uint64_t version;
21+
22+
/**
23+
* The data item, or nullopt if this is a deleted item placeholder.
24+
*/
25+
std::optional<EvaluationResult> flag;
26+
27+
explicit ItemDescriptor(uint64_t version);
28+
29+
explicit ItemDescriptor(EvaluationResult flag);
30+
31+
ItemDescriptor(ItemDescriptor const& item) = default;
32+
ItemDescriptor(ItemDescriptor&& item) = default;
33+
ItemDescriptor& operator=(ItemDescriptor const&) = default;
34+
ItemDescriptor& operator=(ItemDescriptor&&) = default;
35+
~ItemDescriptor() = default;
36+
37+
friend std::ostream& operator<<(std::ostream& out,
38+
ItemDescriptor const& descriptor);
39+
};
40+
41+
/**
42+
* Interface for handling updates from LaunchDarkly.
43+
*/
44+
class IDataSourceUpdateSink {
45+
public:
46+
virtual void init(std::map<std::string, ItemDescriptor> data) = 0;
47+
virtual void upsert(std::string key, ItemDescriptor) = 0;
48+
49+
// We could add this if we want to support data source status.
50+
// virtual void status(<something>)
51+
52+
IDataSourceUpdateSink(IDataSourceUpdateSink const& item) = delete;
53+
IDataSourceUpdateSink(IDataSourceUpdateSink&& item) = delete;
54+
IDataSourceUpdateSink& operator=(IDataSourceUpdateSink const&) = delete;
55+
IDataSourceUpdateSink& operator=(IDataSourceUpdateSink&&) = delete;
56+
virtual ~IDataSourceUpdateSink() = default;
57+
58+
protected:
59+
IDataSourceUpdateSink() = default;
60+
};
61+
62+
bool operator==(ItemDescriptor const& lhs, ItemDescriptor const& rhs);
63+
64+
} // namespace launchdarkly::client_side
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
#pragma once
2+
3+
#include <string>
4+
5+
namespace launchdarkly::client_side::data_sources::detail {
6+
7+
/**
8+
* Return a base64 encoded version of the input string.
9+
* This version is URL safe, which means where a typical '+' or '/' are used
10+
* instead a '-' or '/' will be used.
11+
* @param input The string to Base64 encode.
12+
* @return The encoded value.
13+
*/
14+
std::string Base64UrlEncode(std::string const& input);
15+
16+
} // namespace launchdarkly::client_size
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#pragma once
2+
3+
#include <boost/asio/any_io_executor.hpp>
4+
5+
#include "config/detail/service_endpoints.hpp"
6+
#include "context.hpp"
7+
#include "data/evaluation_result.hpp"
8+
#include "launchdarkly/client_side/data_source.hpp"
9+
#include "launchdarkly/client_side/data_source_update_sink.hpp"
10+
#include "launchdarkly/sse/client.hpp"
11+
#include "logger.hpp"
12+
13+
namespace launchdarkly::client_side::data_sources::detail {
14+
15+
/**
16+
* This class handles events source events, parses them, and then uses
17+
* a IDataSourceUpdateSink to process the parsed events.
18+
*/
19+
class StreamingDataHandler {
20+
public:
21+
/**
22+
* Status indicating if the message was processed, or if there
23+
* was an issue encountered.
24+
*/
25+
enum class MessageStatus {
26+
kMessageHandled,
27+
kInvalidMessage,
28+
kUnhandledVerb
29+
};
30+
31+
/**
32+
* Represents patch JSON from the LaunchDarkly service.
33+
*/
34+
struct PatchData {
35+
std::string key;
36+
EvaluationResult flag;
37+
};
38+
39+
/**
40+
* Represents delete JSON from the LaunchDarkly service.
41+
*/
42+
struct DeleteData {
43+
std::string key;
44+
uint64_t version;
45+
};
46+
47+
StreamingDataHandler(std::shared_ptr<IDataSourceUpdateSink> handler,
48+
Logger const& logger);
49+
50+
/**
51+
* Handle an SSE event.
52+
* @param event The event to handle.
53+
* @return A status indicating if the message could be handled.
54+
*/
55+
MessageStatus handle_message(launchdarkly::sse::Event const& event);
56+
57+
private:
58+
std::shared_ptr<IDataSourceUpdateSink> handler_;
59+
Logger const& logger_;
60+
};
61+
} // namespace launchdarkly::client_side::data_sources::detail
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
#pragma once
2+
3+
#include <chrono>
4+
using namespace std::chrono_literals;
5+
6+
#include <boost/asio/any_io_executor.hpp>
7+
8+
#include "config/detail/http_properties.hpp"
9+
#include "config/detail/service_endpoints.hpp"
10+
#include "context.hpp"
11+
#include "data/evaluation_result.hpp"
12+
#include "launchdarkly/client_side/data_source.hpp"
13+
#include "launchdarkly/client_side/data_source_update_sink.hpp"
14+
#include "launchdarkly/client_side/data_sources/detail/streaming_data_handler.hpp"
15+
#include "launchdarkly/sse/client.hpp"
16+
#include "logger.hpp"
17+
18+
namespace launchdarkly::client_side::data_sources::detail {
19+
20+
class StreamingDataSource final : public IDataSource {
21+
public:
22+
StreamingDataSource(std::string const& sdk_key,
23+
boost::asio::any_io_executor ioc,
24+
Context const& context,
25+
config::ServiceEndpoints const& endpoints,
26+
config::detail::HttpProperties const& http_properties,
27+
bool use_report,
28+
bool with_reasons,
29+
std::shared_ptr<IDataSourceUpdateSink> handler,
30+
Logger const& logger);
31+
32+
void start() override;
33+
void close() override;
34+
35+
private:
36+
StreamingDataHandler data_source_handler_;
37+
std::string streaming_endpoint_;
38+
std::string string_context_;
39+
40+
Logger const& logger_;
41+
std::shared_ptr<launchdarkly::sse::Client> client_;
42+
43+
inline static const std::string streaming_path_ = "/meval";
44+
};
45+
} // namespace launchdarkly::client_side::data_sources::detail

libs/client-sdk/src/CMakeLists.txt

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22
file(GLOB HEADER_LIST CONFIGURE_DEPENDS "${LaunchDarklyCPPClient_SOURCE_DIR}/include/launchdarkly/*.hpp")
33

44
# Automatic library: static or dynamic based on user config.
5-
add_library(${LIBNAME} api.cpp ${HEADER_LIST})
5+
add_library(${LIBNAME} api.cpp ${HEADER_LIST} data_sources/streaming_data_source.cpp data_sources/base_64.cpp data_sources/streaming_data_handler.cpp data_source_update_sink.cpp)
6+
7+
target_link_libraries(${LIBNAME} launchdarkly::common launchdarkly::sse)
68

79
add_library(launchdarkly::client ALIAS ${LIBNAME})
810

libs/client-sdk/src/api.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
#include "launchdarkly/api.hpp"
1+
#include "launchdarkly/client_side/api.hpp"
22

33
#include <cstdint>
44
#include <optional>
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#include "launchdarkly/client_side/data_source_update_sink.hpp"
2+
3+
namespace launchdarkly::client_side {
4+
5+
bool operator==(ItemDescriptor const& lhs, ItemDescriptor const& rhs) {
6+
return lhs.version == rhs.version && lhs.flag == rhs.flag;
7+
}
8+
9+
std::ostream& operator<<(std::ostream& out, ItemDescriptor const& descriptor) {
10+
out << "{";
11+
out << " version: " << descriptor.version;
12+
if (descriptor.flag.has_value()) {
13+
out << " flag: " << descriptor.flag.value();
14+
} else {
15+
out << " flag: <nullopt>";
16+
}
17+
return out;
18+
}
19+
ItemDescriptor::ItemDescriptor(uint64_t version) : version(version) {}
20+
21+
ItemDescriptor::ItemDescriptor(EvaluationResult flag)
22+
: version(flag.version()), flag(std::move(flag)) {}
23+
} // namespace launchdarkly::client_side
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#include "launchdarkly/client_side/data_sources/detail/base_64.hpp"
2+
3+
#include <array>
4+
#include <bitset>
5+
#include <climits>
6+
#include <cstddef>
7+
8+
static unsigned char const kEncodeSize = 4;
9+
static unsigned char const kInputBytesPerEncodeSize = 3;
10+
11+
// Size of the index into the base64_table.
12+
// Base64 uses a 6 bit index.
13+
static unsigned long const kIndexBits = 6UL;
14+
15+
namespace launchdarkly::client_side::data_sources::detail {
16+
17+
// URL safe base64 table.
18+
static std::array<unsigned char, 65> const kBase64Table{
19+
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"};
20+
21+
/**
22+
* Get a bit set populated with the bits at the specific start_bit through
23+
* the count.
24+
*/
25+
static std::bitset<kIndexBits> GetBits(std::size_t start_bit,
26+
std::size_t count,
27+
std::string const& input) {
28+
std::bitset<kIndexBits> out_set;
29+
auto out_index = 0;
30+
// Iterate the bits from the highest bit. bit 0, would be the 7th
31+
// bit in the first byte.
32+
for (auto bit_index = start_bit; bit_index < start_bit + count;
33+
bit_index++) {
34+
auto str_index = bit_index / CHAR_BIT;
35+
auto character = input[str_index];
36+
size_t bit_in_byte = (CHAR_BIT - 1) - (bit_index % CHAR_BIT);
37+
unsigned char bit_mask = 1 << (bit_in_byte);
38+
auto bit = (bit_mask & character) != 0;
39+
out_set[out_set.size() - 1 - out_index] = bit;
40+
out_index++;
41+
}
42+
return out_set;
43+
}
44+
45+
std::string Base64UrlEncode(std::string const& input) {
46+
auto bit_count = input.size() * CHAR_BIT;
47+
std::string out;
48+
std::size_t bit_index = 0;
49+
50+
// Every 3 bytes takes 4 characters of output.
51+
auto reserve_size = (input.size() / kInputBytesPerEncodeSize) * kEncodeSize;
52+
// If not a multiple of 3, then we need to add 4 more bytes to the size.
53+
// This will contain the extra encoded characters and padding.
54+
if ((input.size() % kInputBytesPerEncodeSize) != 0U) {
55+
reserve_size += kEncodeSize;
56+
}
57+
out.reserve(reserve_size);
58+
59+
while (bit_index < bit_count) {
60+
// Get either 6 bits, or the remaining number of bits.
61+
auto bits = GetBits(bit_index,
62+
std::min(kIndexBits, bit_count - bit_index), input);
63+
out.push_back(static_cast<char>(kBase64Table.at(bits.to_ulong())));
64+
bit_index += kIndexBits;
65+
}
66+
// If the string is not divisible evenly by the kEncodeSize
67+
// then pad it with '=' until it is.
68+
if (out.size() % kEncodeSize != 0) {
69+
out.append(kEncodeSize - (out.size()) % kEncodeSize, '=');
70+
}
71+
return out;
72+
}
73+
74+
} // namespace launchdarkly::client_side::data_sources::detail

0 commit comments

Comments
 (0)