Skip to content

feat: add event summarizer #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .clang-tidy
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
---
CheckOptions:
- { key: readability-identifier-length.IgnoredParameterNames, value: 'i|j|k|c|os|it' }
- { key: readability-identifier-length.IgnoredVariableNames, value: 'ec' }
- { key: readability-identifier-length.IgnoredLoopCounterNames, value: 'i|j|k|c|os|it' }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

boost::system::error_code ec is all over the boost docs.

3 changes: 3 additions & 0 deletions libs/common/include/events/client_events.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ struct FeatureEventParams {
std::string key;
Context context;
EvaluationResult eval_result;
Value default_;
};

struct FeatureEventBase {
Expand All @@ -29,6 +30,8 @@ struct FeatureEventBase {
Value value;
std::optional<Reason> reason;
Value default_;

explicit FeatureEventBase(FeatureEventParams const& params);
};

struct FeatureEvent : public FeatureEventBase {
Expand Down
21 changes: 0 additions & 21 deletions libs/common/include/events/common_events.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,6 @@ struct Date {
std::chrono::system_clock::time_point t;
};

struct VariationSummary {
std::size_t count;
Value value;
};

struct VariationKey {
Version version;
std::optional<VariationIndex> variation;

struct Hash {
auto operator()(VariationKey const& p) const -> size_t {
if (p.variation) {
return std::hash<Version>{}(p.version) ^
std::hash<VariationIndex>{}(*p.variation);
} else {
return std::hash<Version>{}(p.version);
}
}
};
};

struct TrackEventParams {
Date creation_date;
std::string key;
Expand Down
4 changes: 2 additions & 2 deletions libs/common/include/events/detail/asio_event_processor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "context_filter.hpp"
#include "events/detail/conn_pool.hpp"
#include "events/detail/outbox.hpp"
#include "events/detail/summary_state.hpp"
#include "events/detail/summarizer.hpp"
#include "events/event_processor.hpp"
#include "events/events.hpp"
#include "logger.hpp"
Expand Down Expand Up @@ -43,7 +43,7 @@ class AsioEventProcessor : public IEventProcessor {

boost::asio::any_io_executor io_;
Outbox outbox_;
SummaryState summary_state_;
Summarizer summarizer_;

std::chrono::milliseconds flush_interval_;
boost::asio::steady_timer timer_;
Expand Down
110 changes: 110 additions & 0 deletions libs/common/include/events/detail/summarizer.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#pragma once
#include <boost/container_hash/hash.hpp>
#include <chrono>
#include <functional>
#include <set>
#include <unordered_map>
#include <unordered_set>
#include "events/events.hpp"
#include "value.hpp"

namespace launchdarkly::events::detail {

/**
* Summarizer is responsible for accepting FeatureEventParams (the context
* related to a feature evaluation) and outputting summary events (which
* essentially condenses the various evaluation results into a single
* structure).
*/
class Summarizer {
public:
using Time = std::chrono::system_clock::time_point;
using FlagKey = std::string;

/**
* Construct a Summarizer starting at the given time.
* @param start Start time of the summary.
*/
explicit Summarizer(Time start_time);

/**
* Construct a Summarizer at time zero.
*/
Summarizer() = default;

/**
* Updates the summary with a feature event.
* @param event Feature event.
*/
void Update(client::FeatureEventParams const& event);

/**
* Marks the summary as finished at a given timestamp.
* @param end_time End time of the summary.
*/
Summarizer& Finish(Time end_time);

/**
* Returns true if the summary is empty.
*/
[[nodiscard]] bool Empty() const;

/**
* Returns the summary's start time as given in the constructor.
*/
[[nodiscard]] Time start_time() const;

/**
* Returns the summary's end time as specified using Finish.
*/
[[nodiscard]] Time end_time() const;

struct VariationSummary {
public:
explicit VariationSummary(Value value);
void Increment();
[[nodiscard]] std::int32_t count() const;
[[nodiscard]] Value const& value() const;

private:
std::int32_t count_;
Value value_;
};

struct VariationKey {
std::optional<Version> version;
std::optional<VariationIndex> variation;

VariationKey();
VariationKey(Version version, std::optional<VariationIndex> variation);

bool operator==(VariationKey const& k) const {
return k.variation == variation && k.version == version;
}

bool operator<(VariationKey const& k) const {
if (variation < k.variation) {
return true;
}
return version < k.version;
}
};

struct State {
Value default_;
std::set<std::string> context_kinds;
std::map<Summarizer::VariationKey, Summarizer::VariationSummary>
counters;

explicit State(Value defaultVal);
};

[[nodiscard]] std::unordered_map<FlagKey, State> const& features() const;

private:
Time start_time_;
Summarizer::Time end_time_;
std::unordered_map<FlagKey, State> features_;
};

} // namespace launchdarkly::events::detail
23 changes: 0 additions & 23 deletions libs/common/include/events/detail/summary_state.hpp

This file was deleted.

11 changes: 11 additions & 0 deletions libs/common/include/serialization/events/json_events.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <boost/json.hpp>

#include "events/detail/summarizer.hpp"
#include "events/events.hpp"

namespace launchdarkly::events::client {
Expand Down Expand Up @@ -37,3 +38,13 @@ void tag_invoke(boost::json::value_from_tag const&,
OutputEvent const& event);

} // namespace launchdarkly::events

namespace launchdarkly::events::detail {

void tag_invoke(boost::json::value_from_tag const&,
boost::json::value& json_value,
Summarizer::State const& state);
void tag_invoke(boost::json::value_from_tag const&,
boost::json::value& json_value,
Summarizer const& summary);
} // namespace launchdarkly::events::detail
3 changes: 2 additions & 1 deletion libs/common/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ add_library(${LIBNAME}
events/asio_event_processor.cpp
events/outbox.cpp
events/conn_pool.cpp
events/summary_state.cpp
events/summarizer.cpp
events/client_events.cpp
config/http_properties.cpp
config/data_source_builder.cpp
config/http_properties_builder.cpp)
Expand Down
106 changes: 50 additions & 56 deletions libs/common/src/events/asio_event_processor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,15 @@ AsioEventProcessor::AsioEventProcessor(
Logger& logger)
: io_(boost::asio::make_strand(io)),
outbox_(config.capacity()),
summary_state_(std::chrono::system_clock::now()),
summarizer_(std::chrono::system_clock::now()),
flush_interval_(config.flush_interval()),
timer_(io_),
host_(endpoints.events_base_url()), // TODO: parse and use host
path_(config.path()),
authorization_(std::move(authorization)),
uuids_(),
conns_(),
inbox_capacity_(config.capacity()),
inbox_size_(0),
inbox_mutex_(),
full_outbox_encountered_(false),
full_inbox_encountered_(false),
filter_(config.all_attributes_private(), config.private_attributes()),
Expand Down Expand Up @@ -68,16 +66,14 @@ void AsioEventProcessor::AsyncSend(InputEvent input_event) {
if (!InboxIncrement()) {
return;
}
boost::asio::post(io_, [this, e = std::move(input_event)]() mutable {
boost::asio::post(io_, [this, event = std::move(input_event)]() mutable {
InboxDecrement();
HandleSend(std::move(e));
HandleSend(std::move(event));
});
}

void AsioEventProcessor::HandleSend(InputEvent e) {
summary_state_.update(e);

std::vector<OutputEvent> output_events = Process(std::move(e));
void AsioEventProcessor::HandleSend(InputEvent event) {
std::vector<OutputEvent> output_events = Process(std::move(event));

bool inserted = outbox_.PushDiscardingOverflow(std::move(output_events));
if (!inserted && !full_outbox_encountered_) {
Expand All @@ -95,6 +91,7 @@ void AsioEventProcessor::Flush(FlushTrigger flush_type) {
LD_LOG(logger_, LogLevel::kDebug)
<< "event-processor: nothing to flush";
}
summarizer_ = Summarizer(std::chrono::system_clock::now());
if (flush_type == FlushTrigger::Automatic) {
ScheduleFlush();
}
Expand Down Expand Up @@ -133,8 +130,15 @@ AsioEventProcessor::MakeRequest() {
return std::nullopt;
}

auto events = boost::json::value_from(outbox_.Consume());

if (!summarizer_.Finish(std::chrono::system_clock::now()).Empty()) {
events.as_array().push_back(boost::json::value_from(summarizer_));
}

LD_LOG(logger_, LogLevel::kDebug)
<< "event-processor: generating http request";

RequestType req;

req.set(http::field::host, host_);
Expand All @@ -145,8 +149,7 @@ AsioEventProcessor::MakeRequest() {
req.set(kPayloadIdHeader, boost::lexical_cast<std::string>(uuids_()));
req.target(host_ + path_);

req.body() =
boost::json::serialize(boost::json::value_from(outbox_.Consume()));
req.body() = boost::json::serialize(events);
req.prepare_payload();
return req;
}
Expand All @@ -160,53 +163,44 @@ struct overloaded : Ts... {
template <class... Ts>
overloaded(Ts...) -> overloaded<Ts...>;

std::vector<OutputEvent> AsioEventProcessor::Process(InputEvent event) {
std::vector<OutputEvent> AsioEventProcessor::Process(InputEvent input_event) {
std::vector<OutputEvent> out;
std::visit(
overloaded{
[&](client::FeatureEventParams&& e) {
if (!e.eval_result.track_events()) {
return;
}
std::optional<Reason> reason;

// TODO(cwaldren): should also add the reason if the variation
// method was VariationDetail().
if (e.eval_result.track_reason()) {
reason = e.eval_result.detail().reason();
}

client::FeatureEventBase b = {
e.creation_date, std::move(e.key), e.eval_result.version(),
e.eval_result.detail().variation_index(),
e.eval_result.detail().value(), reason,
// TODO(cwaldren): change to actual default; figure out
// where this should be plumbed through.
Value::null()};

auto debug_until_date = e.eval_result.debug_events_until_date();
bool emit_debug_event =
debug_until_date &&
debug_until_date.value() > std::chrono::system_clock::now();

if (emit_debug_event) {
out.emplace_back(
client::DebugEvent{b, filter_.filter(e.context)});
}
// TODO(cwaldren): see about not copying the keys / having the
// getter return a value.
out.emplace_back(client::FeatureEvent{
std::move(b), e.context.kinds_to_keys()});
},
[&](client::IdentifyEventParams&& e) {
// Contexts should already have been checked for
// validity by this point.
assert(e.context.valid());
out.emplace_back(client::IdentifyEvent{
e.creation_date, filter_.filter(e.context)});
},
[&](TrackEventParams&& e) { out.emplace_back(std::move(e)); }},
std::move(event));
overloaded{[&](client::FeatureEventParams&& event) {
summarizer_.Update(event);

if (!event.eval_result.track_events()) {
return;
}

client::FeatureEventBase base{event};

auto debug_until_date =
event.eval_result.debug_events_until_date();

bool emit_debug_event =
debug_until_date &&
debug_until_date.value() >
std::chrono::system_clock::now();

if (emit_debug_event) {
out.emplace_back(client::DebugEvent{
base, filter_.filter(event.context)});
}
out.emplace_back(client::FeatureEvent{
std::move(base), event.context.kinds_to_keys()});
},
[&](client::IdentifyEventParams&& event) {
// Contexts should already have been checked for
// validity by this point.
assert(event.context.valid());
out.emplace_back(client::IdentifyEvent{
event.creation_date, filter_.filter(event.context)});
},
[&](TrackEventParams&& event) {
out.emplace_back(std::move(event));
}},
std::move(input_event));

return out;
}
Expand Down
Loading