-
Notifications
You must be signed in to change notification settings - Fork 3
feat: Implement streaming data source. #17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
21fbb55
d94d9ac
4d5f2ce
514ebae
55798cc
117ca4f
908ef2c
b01e1ad
2cd4ecb
1775377
7628768
2a783e9
98802c1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
#pragma once | ||
|
||
namespace launchdarkly::client_side { | ||
|
||
class IDataSource { | ||
public: | ||
virtual void start() = 0; | ||
virtual void close() = 0; | ||
|
||
virtual ~IDataSource() = default; | ||
IDataSource(IDataSource const& item) = delete; | ||
IDataSource(IDataSource&& item) = delete; | ||
IDataSource& operator=(IDataSource const&) = delete; | ||
IDataSource& operator=(IDataSource&&) = delete; | ||
|
||
protected: | ||
IDataSource() = default; | ||
}; | ||
|
||
} // namespace launchdarkly::client_side |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
#pragma once | ||
|
||
#include <map> | ||
#include <string> | ||
|
||
#include "config/detail/service_endpoints.hpp" | ||
#include "context.hpp" | ||
#include "data/evaluation_result.hpp" | ||
|
||
namespace launchdarkly::client_side { | ||
|
||
/** | ||
* An item descriptor is an abstraction that allows for Flag data to be | ||
* handled using the same type in both a put or a patch. | ||
*/ | ||
struct ItemDescriptor { | ||
/** | ||
* The version number of this data, provided by the SDK. | ||
*/ | ||
uint64_t version; | ||
|
||
/** | ||
* The data item, or nullopt if this is a deleted item placeholder. | ||
*/ | ||
std::optional<EvaluationResult> flag; | ||
|
||
explicit ItemDescriptor(uint64_t version); | ||
|
||
explicit ItemDescriptor(EvaluationResult flag); | ||
|
||
ItemDescriptor(ItemDescriptor const& item) = default; | ||
ItemDescriptor(ItemDescriptor&& item) = default; | ||
ItemDescriptor& operator=(ItemDescriptor const&) = default; | ||
ItemDescriptor& operator=(ItemDescriptor&&) = default; | ||
~ItemDescriptor() = default; | ||
|
||
friend std::ostream& operator<<(std::ostream& out, | ||
ItemDescriptor const& descriptor); | ||
}; | ||
|
||
/** | ||
* Interface for handling updates from LaunchDarkly. | ||
*/ | ||
class IDataSourceUpdateSink { | ||
public: | ||
virtual void init(std::map<std::string, ItemDescriptor> data) = 0; | ||
virtual void upsert(std::string key, ItemDescriptor) = 0; | ||
|
||
// We could add this if we want to support data source status. | ||
// virtual void status(<something>) | ||
|
||
IDataSourceUpdateSink(IDataSourceUpdateSink const& item) = delete; | ||
IDataSourceUpdateSink(IDataSourceUpdateSink&& item) = delete; | ||
IDataSourceUpdateSink& operator=(IDataSourceUpdateSink const&) = delete; | ||
IDataSourceUpdateSink& operator=(IDataSourceUpdateSink&&) = delete; | ||
virtual ~IDataSourceUpdateSink() = default; | ||
|
||
protected: | ||
IDataSourceUpdateSink() = default; | ||
}; | ||
|
||
bool operator==(ItemDescriptor const& lhs, ItemDescriptor const& rhs); | ||
|
||
} // namespace launchdarkly::client_side |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
#pragma once | ||
|
||
#include <string> | ||
|
||
namespace launchdarkly::client_side::data_sources::detail { | ||
|
||
/** | ||
* Return a base64 encoded version of the input string. | ||
* This version is URL safe, which means where a typical '+' or '/' are used | ||
* instead a '-' or '/' will be used. | ||
* @param input The string to Base64 encode. | ||
* @return The encoded value. | ||
*/ | ||
std::string Base64UrlEncode(std::string const& input); | ||
|
||
} // namespace launchdarkly::client_size |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
#pragma once | ||
|
||
#include <boost/asio/any_io_executor.hpp> | ||
|
||
#include "config/detail/service_endpoints.hpp" | ||
#include "context.hpp" | ||
#include "data/evaluation_result.hpp" | ||
#include "launchdarkly/client_side/data_source.hpp" | ||
#include "launchdarkly/client_side/data_source_update_sink.hpp" | ||
#include "launchdarkly/sse/client.hpp" | ||
#include "logger.hpp" | ||
|
||
namespace launchdarkly::client_side::data_sources::detail { | ||
|
||
/** | ||
* This class handles events source events, parses them, and then uses | ||
* a IDataSourceUpdateSink to process the parsed events. | ||
*/ | ||
class StreamingDataHandler { | ||
public: | ||
/** | ||
* Status indicating if the message was processed, or if there | ||
* was an issue encountered. | ||
*/ | ||
enum class MessageStatus { | ||
kMessageHandled, | ||
kInvalidMessage, | ||
kUnhandledVerb | ||
}; | ||
|
||
/** | ||
* Represents patch JSON from the LaunchDarkly service. | ||
*/ | ||
struct PatchData { | ||
std::string key; | ||
EvaluationResult flag; | ||
}; | ||
|
||
/** | ||
* Represents delete JSON from the LaunchDarkly service. | ||
*/ | ||
struct DeleteData { | ||
std::string key; | ||
uint64_t version; | ||
}; | ||
|
||
StreamingDataHandler(std::shared_ptr<IDataSourceUpdateSink> handler, | ||
Logger const& logger); | ||
|
||
/** | ||
* Handle an SSE event. | ||
* @param event The event to handle. | ||
* @return A status indicating if the message could be handled. | ||
*/ | ||
MessageStatus handle_message(launchdarkly::sse::Event const& event); | ||
|
||
private: | ||
std::shared_ptr<IDataSourceUpdateSink> handler_; | ||
Logger const& logger_; | ||
}; | ||
} // namespace launchdarkly::client_side::data_sources::detail |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
#pragma once | ||
|
||
#include <chrono> | ||
using namespace std::chrono_literals; | ||
|
||
#include <boost/asio/any_io_executor.hpp> | ||
|
||
#include "config/detail/http_properties.hpp" | ||
#include "config/detail/service_endpoints.hpp" | ||
#include "context.hpp" | ||
#include "data/evaluation_result.hpp" | ||
#include "launchdarkly/client_side/data_source.hpp" | ||
#include "launchdarkly/client_side/data_source_update_sink.hpp" | ||
#include "launchdarkly/client_side/data_sources/detail/streaming_data_handler.hpp" | ||
#include "launchdarkly/sse/client.hpp" | ||
#include "logger.hpp" | ||
|
||
namespace launchdarkly::client_side::data_sources::detail { | ||
|
||
class StreamingDataSource final : public IDataSource { | ||
public: | ||
StreamingDataSource(std::string const& sdk_key, | ||
boost::asio::any_io_executor ioc, | ||
Context const& context, | ||
config::ServiceEndpoints const& endpoints, | ||
config::detail::HttpProperties const& http_properties, | ||
bool use_report, | ||
bool with_reasons, | ||
std::shared_ptr<IDataSourceUpdateSink> handler, | ||
Logger const& logger); | ||
|
||
void start() override; | ||
void close() override; | ||
|
||
private: | ||
StreamingDataHandler data_source_handler_; | ||
std::string streaming_endpoint_; | ||
std::string string_context_; | ||
|
||
Logger const& logger_; | ||
std::shared_ptr<launchdarkly::sse::Client> client_; | ||
|
||
inline static const std::string streaming_path_ = "/meval"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure if this is in the next PR, but this could be extracted into Defaults for the client SDK There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I had not considered putting the paths in the defaults. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The whole process of making a streaming URL is different. So I am not sure on this yet. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
}; | ||
} // namespace launchdarkly::client_side::data_sources::detail |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
#include "launchdarkly/client_side/data_source_update_sink.hpp" | ||
|
||
namespace launchdarkly::client_side { | ||
|
||
bool operator==(ItemDescriptor const& lhs, ItemDescriptor const& rhs) { | ||
return lhs.version == rhs.version && lhs.flag == rhs.flag; | ||
} | ||
|
||
std::ostream& operator<<(std::ostream& out, ItemDescriptor const& descriptor) { | ||
out << "{"; | ||
out << " version: " << descriptor.version; | ||
if (descriptor.flag.has_value()) { | ||
out << " flag: " << descriptor.flag.value(); | ||
} else { | ||
out << " flag: <nullopt>"; | ||
} | ||
return out; | ||
} | ||
ItemDescriptor::ItemDescriptor(uint64_t version) : version(version) {} | ||
|
||
ItemDescriptor::ItemDescriptor(EvaluationResult flag) | ||
: version(flag.version()), flag(std::move(flag)) {} | ||
} // namespace launchdarkly::client_side |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
#include "launchdarkly/client_side/data_sources/detail/base_64.hpp" | ||
|
||
#include <array> | ||
#include <bitset> | ||
#include <climits> | ||
#include <cstddef> | ||
|
||
static unsigned char const kEncodeSize = 4; | ||
static unsigned char const kInputBytesPerEncodeSize = 3; | ||
|
||
// Size of the index into the base64_table. | ||
// Base64 uses a 6 bit index. | ||
static unsigned long const kIndexBits = 6UL; | ||
|
||
namespace launchdarkly::client_side::data_sources::detail { | ||
|
||
// URL safe base64 table. | ||
static std::array<unsigned char, 65> const kBase64Table{ | ||
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"}; | ||
|
||
/** | ||
* Get a bit set populated with the bits at the specific start_bit through | ||
* the count. | ||
*/ | ||
static std::bitset<kIndexBits> GetBits(std::size_t start_bit, | ||
std::size_t count, | ||
std::string const& input) { | ||
std::bitset<kIndexBits> out_set; | ||
auto out_index = 0; | ||
// Iterate the bits from the highest bit. bit 0, would be the 7th | ||
// bit in the first byte. | ||
for (auto bit_index = start_bit; bit_index < start_bit + count; | ||
bit_index++) { | ||
auto str_index = bit_index / CHAR_BIT; | ||
auto character = input[str_index]; | ||
size_t bit_in_byte = (CHAR_BIT - 1) - (bit_index % CHAR_BIT); | ||
unsigned char bit_mask = 1 << (bit_in_byte); | ||
auto bit = (bit_mask & character) != 0; | ||
out_set[out_set.size() - 1 - out_index] = bit; | ||
out_index++; | ||
} | ||
return out_set; | ||
} | ||
|
||
std::string Base64UrlEncode(std::string const& input) { | ||
auto bit_count = input.size() * CHAR_BIT; | ||
std::string out; | ||
std::size_t bit_index = 0; | ||
|
||
// Every 3 bytes takes 4 characters of output. | ||
auto reserve_size = (input.size() / kInputBytesPerEncodeSize) * kEncodeSize; | ||
// If not a multiple of 3, then we need to add 4 more bytes to the size. | ||
// This will contain the extra encoded characters and padding. | ||
if ((input.size() % kInputBytesPerEncodeSize) != 0U) { | ||
reserve_size += kEncodeSize; | ||
} | ||
out.reserve(reserve_size); | ||
|
||
while (bit_index < bit_count) { | ||
// Get either 6 bits, or the remaining number of bits. | ||
auto bits = GetBits(bit_index, | ||
std::min(kIndexBits, bit_count - bit_index), input); | ||
out.push_back(static_cast<char>(kBase64Table.at(bits.to_ulong()))); | ||
bit_index += kIndexBits; | ||
} | ||
// If the string is not divisible evenly by the kEncodeSize | ||
// then pad it with '=' until it is. | ||
if (out.size() % kEncodeSize != 0) { | ||
out.append(kEncodeSize - (out.size()) % kEncodeSize, '='); | ||
} | ||
return out; | ||
} | ||
|
||
} // namespace launchdarkly::client_side::data_sources::detail |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: could be good to explain why the sink is shared ownership on the constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not 100% sure what this will actually be. Currently I have a shared pointer to the handler type. But we have not made a data store, so I am not completely sure where the lifetimes of these will be managed. It may just change to a pointer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll make a ticket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://app.shortcut.com/launchdarkly/story/197121/determine-how-the-data-store-will-be-owned-and-what-the-relationship-with-the-update-sink-will-be