Skip to content

feat: Implement streaming data source. #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/hello-cpp/main.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include <launchdarkly/api.hpp>
#include <launchdarkly/sse/client.hpp>
#include "launchdarkly/client_side/api.hpp"

#include <boost/asio/io_context.hpp>

Expand Down
2 changes: 1 addition & 1 deletion bindings/c/src/api.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#include "launchdarkly/client_side/api.hpp"
#include <launchdarkly/api.h>
#include <launchdarkly/api.hpp>

bool launchdarkly_foo(int32_t* out_result) {
if (auto val = launchdarkly::foo()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#pragma once

namespace launchdarkly::client_side {

class IDataSource {
public:
virtual void start() = 0;
virtual void close() = 0;

virtual ~IDataSource() = default;
IDataSource(IDataSource const& item) = delete;
IDataSource(IDataSource&& item) = delete;
IDataSource& operator=(IDataSource const&) = delete;
IDataSource& operator=(IDataSource&&) = delete;

protected:
IDataSource() = default;
};

} // namespace launchdarkly::client_side
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
#pragma once

#include <map>
#include <string>

#include "config/detail/service_endpoints.hpp"
#include "context.hpp"
#include "data/evaluation_result.hpp"

namespace launchdarkly::client_side {

/**
* An item descriptor is an abstraction that allows for Flag data to be
* handled using the same type in both a put or a patch.
*/
struct ItemDescriptor {
/**
* The version number of this data, provided by the SDK.
*/
uint64_t version;

/**
* The data item, or nullopt if this is a deleted item placeholder.
*/
std::optional<EvaluationResult> flag;

explicit ItemDescriptor(uint64_t version);

explicit ItemDescriptor(EvaluationResult flag);

ItemDescriptor(ItemDescriptor const& item) = default;
ItemDescriptor(ItemDescriptor&& item) = default;
ItemDescriptor& operator=(ItemDescriptor const&) = default;
ItemDescriptor& operator=(ItemDescriptor&&) = default;
~ItemDescriptor() = default;

friend std::ostream& operator<<(std::ostream& out,
ItemDescriptor const& descriptor);
};

/**
* Interface for handling updates from LaunchDarkly.
*/
class IDataSourceUpdateSink {
public:
virtual void init(std::map<std::string, ItemDescriptor> data) = 0;
virtual void upsert(std::string key, ItemDescriptor) = 0;

// We could add this if we want to support data source status.
// virtual void status(<something>)

IDataSourceUpdateSink(IDataSourceUpdateSink const& item) = delete;
IDataSourceUpdateSink(IDataSourceUpdateSink&& item) = delete;
IDataSourceUpdateSink& operator=(IDataSourceUpdateSink const&) = delete;
IDataSourceUpdateSink& operator=(IDataSourceUpdateSink&&) = delete;
virtual ~IDataSourceUpdateSink() = default;

protected:
IDataSourceUpdateSink() = default;
};

bool operator==(ItemDescriptor const& lhs, ItemDescriptor const& rhs);

} // namespace launchdarkly::client_side
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#pragma once

#include <string>

namespace launchdarkly::client_side::data_sources::detail {

/**
* Return a base64 encoded version of the input string.
* This version is URL safe, which means where a typical '+' or '/' are used
* instead a '-' or '/' will be used.
* @param input The string to Base64 encode.
* @return The encoded value.
*/
std::string Base64UrlEncode(std::string const& input);

} // namespace launchdarkly::client_size
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#pragma once

#include <boost/asio/any_io_executor.hpp>

#include "config/detail/service_endpoints.hpp"
#include "context.hpp"
#include "data/evaluation_result.hpp"
#include "launchdarkly/client_side/data_source.hpp"
#include "launchdarkly/client_side/data_source_update_sink.hpp"
#include "launchdarkly/sse/client.hpp"
#include "logger.hpp"

namespace launchdarkly::client_side::data_sources::detail {

/**
* This class handles events source events, parses them, and then uses
* a IDataSourceUpdateSink to process the parsed events.
*/
class StreamingDataHandler {
public:
/**
* Status indicating if the message was processed, or if there
* was an issue encountered.
*/
enum class MessageStatus {
kMessageHandled,
kInvalidMessage,
kUnhandledVerb
};

/**
* Represents patch JSON from the LaunchDarkly service.
*/
struct PatchData {
std::string key;
EvaluationResult flag;
};

/**
* Represents delete JSON from the LaunchDarkly service.
*/
struct DeleteData {
std::string key;
uint64_t version;
};

StreamingDataHandler(std::shared_ptr<IDataSourceUpdateSink> handler,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: could be good to explain why the sink is shared ownership on the constructor

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not 100% sure what this will actually be. Currently I have a shared pointer to the handler type. But we have not made a data store, so I am not completely sure where the lifetimes of these will be managed. It may just change to a pointer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll make a ticket.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logger const& logger);

/**
* Handle an SSE event.
* @param event The event to handle.
* @return A status indicating if the message could be handled.
*/
MessageStatus handle_message(launchdarkly::sse::Event const& event);

private:
std::shared_ptr<IDataSourceUpdateSink> handler_;
Logger const& logger_;
};
} // namespace launchdarkly::client_side::data_sources::detail
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#pragma once

#include <chrono>
using namespace std::chrono_literals;

#include <boost/asio/any_io_executor.hpp>

#include "config/detail/http_properties.hpp"
#include "config/detail/service_endpoints.hpp"
#include "context.hpp"
#include "data/evaluation_result.hpp"
#include "launchdarkly/client_side/data_source.hpp"
#include "launchdarkly/client_side/data_source_update_sink.hpp"
#include "launchdarkly/client_side/data_sources/detail/streaming_data_handler.hpp"
#include "launchdarkly/sse/client.hpp"
#include "logger.hpp"

namespace launchdarkly::client_side::data_sources::detail {

class StreamingDataSource final : public IDataSource {
public:
StreamingDataSource(std::string const& sdk_key,
boost::asio::any_io_executor ioc,
Context const& context,
config::ServiceEndpoints const& endpoints,
config::detail::HttpProperties const& http_properties,
bool use_report,
bool with_reasons,
std::shared_ptr<IDataSourceUpdateSink> handler,
Logger const& logger);

void start() override;
void close() override;

private:
StreamingDataHandler data_source_handler_;
std::string streaming_endpoint_;
std::string string_context_;

Logger const& logger_;
std::shared_ptr<launchdarkly::sse::Client> client_;

inline static const std::string streaming_path_ = "/meval";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is in the next PR, but this could be extracted into Defaults for the client SDK

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had not considered putting the paths in the defaults.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The whole process of making a streaming URL is different. So I am not sure on this yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

};
} // namespace launchdarkly::client_side::data_sources::detail
4 changes: 3 additions & 1 deletion libs/client-sdk/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
file(GLOB HEADER_LIST CONFIGURE_DEPENDS "${LaunchDarklyCPPClient_SOURCE_DIR}/include/launchdarkly/*.hpp")

# Automatic library: static or dynamic based on user config.
add_library(${LIBNAME} api.cpp ${HEADER_LIST})
add_library(${LIBNAME} api.cpp ${HEADER_LIST} data_sources/streaming_data_source.cpp data_sources/base_64.cpp data_sources/streaming_data_handler.cpp data_source_update_sink.cpp)

target_link_libraries(${LIBNAME} launchdarkly::common launchdarkly::sse)

add_library(launchdarkly::client ALIAS ${LIBNAME})

Expand Down
2 changes: 1 addition & 1 deletion libs/client-sdk/src/api.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "launchdarkly/api.hpp"
#include "launchdarkly/client_side/api.hpp"

#include <cstdint>
#include <optional>
Expand Down
23 changes: 23 additions & 0 deletions libs/client-sdk/src/data_source_update_sink.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#include "launchdarkly/client_side/data_source_update_sink.hpp"

namespace launchdarkly::client_side {

bool operator==(ItemDescriptor const& lhs, ItemDescriptor const& rhs) {
return lhs.version == rhs.version && lhs.flag == rhs.flag;
}

std::ostream& operator<<(std::ostream& out, ItemDescriptor const& descriptor) {
out << "{";
out << " version: " << descriptor.version;
if (descriptor.flag.has_value()) {
out << " flag: " << descriptor.flag.value();
} else {
out << " flag: <nullopt>";
}
return out;
}
ItemDescriptor::ItemDescriptor(uint64_t version) : version(version) {}

ItemDescriptor::ItemDescriptor(EvaluationResult flag)
: version(flag.version()), flag(std::move(flag)) {}
} // namespace launchdarkly::client_side
74 changes: 74 additions & 0 deletions libs/client-sdk/src/data_sources/base_64.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#include "launchdarkly/client_side/data_sources/detail/base_64.hpp"

#include <array>
#include <bitset>
#include <climits>
#include <cstddef>

static unsigned char const kEncodeSize = 4;
static unsigned char const kInputBytesPerEncodeSize = 3;

// Size of the index into the base64_table.
// Base64 uses a 6 bit index.
static unsigned long const kIndexBits = 6UL;

namespace launchdarkly::client_side::data_sources::detail {

// URL safe base64 table.
static std::array<unsigned char, 65> const kBase64Table{
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_"};

/**
* Get a bit set populated with the bits at the specific start_bit through
* the count.
*/
static std::bitset<kIndexBits> GetBits(std::size_t start_bit,
std::size_t count,
std::string const& input) {
std::bitset<kIndexBits> out_set;
auto out_index = 0;
// Iterate the bits from the highest bit. bit 0, would be the 7th
// bit in the first byte.
for (auto bit_index = start_bit; bit_index < start_bit + count;
bit_index++) {
auto str_index = bit_index / CHAR_BIT;
auto character = input[str_index];
size_t bit_in_byte = (CHAR_BIT - 1) - (bit_index % CHAR_BIT);
unsigned char bit_mask = 1 << (bit_in_byte);
auto bit = (bit_mask & character) != 0;
out_set[out_set.size() - 1 - out_index] = bit;
out_index++;
}
return out_set;
}

std::string Base64UrlEncode(std::string const& input) {
auto bit_count = input.size() * CHAR_BIT;
std::string out;
std::size_t bit_index = 0;

// Every 3 bytes takes 4 characters of output.
auto reserve_size = (input.size() / kInputBytesPerEncodeSize) * kEncodeSize;
// If not a multiple of 3, then we need to add 4 more bytes to the size.
// This will contain the extra encoded characters and padding.
if ((input.size() % kInputBytesPerEncodeSize) != 0U) {
reserve_size += kEncodeSize;
}
out.reserve(reserve_size);

while (bit_index < bit_count) {
// Get either 6 bits, or the remaining number of bits.
auto bits = GetBits(bit_index,
std::min(kIndexBits, bit_count - bit_index), input);
out.push_back(static_cast<char>(kBase64Table.at(bits.to_ulong())));
bit_index += kIndexBits;
}
// If the string is not divisible evenly by the kEncodeSize
// then pad it with '=' until it is.
if (out.size() % kEncodeSize != 0) {
out.append(kEncodeSize - (out.size()) % kEncodeSize, '=');
}
return out;
}

} // namespace launchdarkly::client_side::data_sources::detail
Loading