Skip to content

Commit d1b1f43

Browse files
committed
Get rid of AsioEventProcessor wrapper
1 parent b6cd25f commit d1b1f43

File tree

6 files changed

+230
-295
lines changed

6 files changed

+230
-295
lines changed
Lines changed: 55 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,18 @@
11
#pragma once
2-
#include <atomic>
2+
33
#include <boost/asio/any_io_executor.hpp>
4+
#include <boost/asio/executor_work_guard.hpp>
45
#include <boost/asio/steady_timer.hpp>
5-
#include <queue>
6-
#include <variant>
6+
#include <boost/beast/http.hpp>
7+
#include <boost/uuid/uuid_generators.hpp>
8+
#include <chrono>
9+
#include <optional>
710
#include "config/detail/events.hpp"
811
#include "config/detail/service_hosts.hpp"
9-
#include "events/detail/dispatcher.hpp"
12+
#include "context_filter.hpp"
13+
#include "events/detail/conn_pool.hpp"
14+
#include "events/detail/outbox.hpp"
15+
#include "events/detail/summary_state.hpp"
1016
#include "events/event_processor.hpp"
1117
#include "events/events.hpp"
1218
#include "logger.hpp"
@@ -15,17 +21,57 @@ namespace launchdarkly::events::detail {
1521

1622
class AsioEventProcessor : public IEventProcessor {
1723
public:
18-
AsioEventProcessor(boost::asio::any_io_executor const& executor,
24+
AsioEventProcessor(boost::asio::any_io_executor io,
1925
config::detail::Events const& config,
2026
config::ServiceHosts const& endpoints,
27+
std::string authorization,
2128
Logger& logger);
2229

23-
void AsyncSend(InputEvent event) override;
24-
void AsyncFlush() override;
25-
void AsyncClose() override;
30+
virtual void AsyncFlush() override;
31+
32+
virtual void AsyncSend(InputEvent) override;
33+
34+
virtual void AsyncClose() override;
2635

2736
private:
37+
enum class FlushTrigger {
38+
Automatic = 0,
39+
Manual = 1,
40+
};
41+
using RequestType =
42+
boost::beast::http::request<boost::beast::http::string_body>;
43+
44+
boost::asio::any_io_executor io_;
45+
Outbox outbox_;
46+
SummaryState summary_state_;
47+
48+
std::chrono::milliseconds reaction_time_;
49+
std::chrono::milliseconds flush_interval_;
50+
boost::asio::steady_timer timer_;
51+
52+
std::string host_;
53+
std::string path_;
54+
std::string authorization_;
55+
56+
boost::uuids::random_generator uuids_;
57+
58+
ConnPool conns_;
59+
60+
bool full_outbox_encountered_;
61+
62+
launchdarkly::ContextFilter filter_;
63+
2864
Logger& logger_;
29-
Dispatcher dispatcher_;
65+
66+
void HandleSend(InputEvent e);
67+
68+
std::optional<RequestType> MakeRequest();
69+
70+
void Flush(FlushTrigger flush_type);
71+
72+
void ScheduleFlush();
73+
74+
std::vector<OutputEvent> Process(InputEvent e);
3075
};
76+
3177
} // namespace launchdarkly::events::detail

libs/common/include/events/detail/dispatcher.hpp

Lines changed: 0 additions & 76 deletions
This file was deleted.

libs/common/src/CMakeLists.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ add_library(${LIBNAME}
3939
serialization/events/json_events.cpp
4040
events/null_event_processor.cpp
4141
events/asio_event_processor.cpp
42-
events/dispatcher.cpp
4342
events/outbox.cpp
4443
events/conn_pool.cpp
4544
events/summary_state.cpp
Lines changed: 174 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,188 @@
11
#include "events/detail/asio_event_processor.hpp"
2+
#include <boost/asio/post.hpp>
3+
#include <boost/beast/http.hpp>
24

3-
#include <boost/asio/strand.hpp>
4-
#include <chrono>
5+
#include <boost/lexical_cast.hpp>
6+
#include <boost/uuid/uuid_io.hpp>
57

8+
#include "serialization/events/json_events.hpp"
9+
10+
namespace http = boost::beast::http;
611
namespace launchdarkly::events::detail {
712

8-
AsioEventProcessor::AsioEventProcessor(
9-
boost::asio::any_io_executor const& executor,
10-
config::detail::Events const& config,
11-
config::ServiceHosts const& endpoints,
12-
Logger& logger)
13-
: logger_(logger),
14-
dispatcher_(boost::asio::make_strand(executor),
15-
config,
16-
endpoints,
17-
"password",
18-
logger) {}
13+
auto const kEventSchemaHeader = "X-LaunchDarkly-Event-Schema";
14+
auto const kPayloadIdHeader = "X-LaunchDarkly-Payload-Id";
15+
16+
auto const kEventSchemaVersion = 4;
1917

20-
void AsioEventProcessor::AsyncSend(InputEvent in_event) {
21-
LD_LOG(logger_, LogLevel::kDebug) << "processor: pushing event into inbox";
22-
dispatcher_.AsyncSend(std::move(in_event));
18+
AsioEventProcessor::AsioEventProcessor(boost::asio::any_io_executor io,
19+
config::detail::Events const& config,
20+
config::ServiceHosts const& endpoints,
21+
std::string authorization,
22+
Logger& logger)
23+
: io_(std::move(io)),
24+
outbox_(config.capacity()),
25+
summary_state_(std::chrono::system_clock::now()),
26+
flush_interval_(config.flush_interval()),
27+
timer_(io_),
28+
host_(endpoints.events_host()),
29+
path_(config.path()),
30+
authorization_(std::move(authorization)),
31+
uuids_(),
32+
full_outbox_encountered_(false),
33+
filter_(config.all_attributes_private(), config.private_attributes()),
34+
logger_(logger) {
35+
ScheduleFlush();
2336
}
2437

25-
void AsioEventProcessor::AsyncFlush() {
26-
LD_LOG(logger_, LogLevel::kDebug)
27-
<< "processor: requesting unscheduled flush";
28-
dispatcher_.AsyncFlush();
38+
void AsioEventProcessor::AsyncSend(InputEvent input_event) {
39+
boost::asio::post(io_, [this, e = std::move(input_event)]() mutable {
40+
HandleSend(std::move(e));
41+
});
42+
}
43+
44+
void AsioEventProcessor::HandleSend(InputEvent e) {
45+
summary_state_.update(e);
46+
47+
std::vector<OutputEvent> output_events = Process(std::move(e));
48+
49+
bool inserted = outbox_.push_discard_overflow(std::move(output_events));
50+
if (!inserted && !full_outbox_encountered_) {
51+
LD_LOG(logger_, LogLevel::kWarn)
52+
<< "event-processor: exceeded event queue capacity; increase "
53+
"capacity to avoid dropping events";
54+
}
55+
full_outbox_encountered_ = !inserted;
56+
}
57+
58+
void AsioEventProcessor::Flush(FlushTrigger flush_type) {
59+
if (auto request = MakeRequest()) {
60+
conns_.async_write(*request);
61+
} else {
62+
LD_LOG(logger_, LogLevel::kDebug)
63+
<< "event-processor: nothing to flush";
64+
}
65+
if (flush_type == FlushTrigger::Automatic) {
66+
ScheduleFlush();
67+
}
68+
}
69+
70+
void AsioEventProcessor::ScheduleFlush() {
71+
LD_LOG(logger_, LogLevel::kDebug) << "event-processor: scheduling flush in "
72+
<< flush_interval_.count() << "ms";
73+
74+
timer_.expires_from_now(flush_interval_);
75+
timer_.async_wait([this](boost::system::error_code ec) {
76+
if (ec) {
77+
LD_LOG(logger_, LogLevel::kDebug)
78+
<< "event-processor: flush cancelled";
79+
return;
80+
}
81+
LD_LOG(logger_, LogLevel::kDebug) << "event-processor: flush";
82+
Flush(FlushTrigger::Automatic);
83+
});
2984
}
3085

3186
void AsioEventProcessor::AsyncClose() {
32-
LD_LOG(logger_, LogLevel::kDebug) << "processor: request shutdown";
33-
dispatcher_.AsyncFlush();
34-
dispatcher_.AsyncClose();
87+
timer_.cancel();
88+
}
89+
90+
void AsioEventProcessor::AsyncFlush() {
91+
boost::asio::post(io_, [this] {
92+
boost::system::error_code ec;
93+
Flush(FlushTrigger::Manual);
94+
});
3595
}
3696

97+
std::optional<AsioEventProcessor::RequestType>
98+
AsioEventProcessor::MakeRequest() {
99+
if (outbox_.empty()) {
100+
return std::nullopt;
101+
}
102+
103+
LD_LOG(logger_, LogLevel::kDebug) << "generating http request";
104+
RequestType req;
105+
106+
req.set(http::field::host, host_);
107+
req.method(http::verb::post);
108+
req.set(http::field::content_type, "application/json");
109+
req.set(http::field::authorization, authorization_);
110+
req.set(kEventSchemaHeader, std::to_string(kEventSchemaVersion));
111+
req.set(kPayloadIdHeader, boost::lexical_cast<std::string>(uuids_()));
112+
req.target(host_ + path_);
113+
114+
req.body() =
115+
boost::json::serialize(boost::json::value_from(outbox_.consume()));
116+
req.prepare_payload();
117+
return req;
118+
}
119+
120+
static std::map<std::string, std::string> CopyContextKeys(
121+
std::map<std::string_view, std::string_view> const& refs) {
122+
std::map<std::string, std::string> copied_keys;
123+
for (auto kv : refs) {
124+
copied_keys.insert(kv);
125+
}
126+
return copied_keys;
127+
}
128+
129+
// These helpers are for the std::visit within Dispatcher::process.
130+
template <class... Ts>
131+
struct overloaded : Ts... {
132+
using Ts::operator()...;
133+
};
134+
// explicit deduction guide (not needed as of C++20)
135+
template <class... Ts>
136+
overloaded(Ts...) -> overloaded<Ts...>;
137+
138+
std::vector<OutputEvent> AsioEventProcessor::Process(InputEvent event) {
139+
std::vector<OutputEvent> out;
140+
std::visit(
141+
overloaded{
142+
[&](client::FeatureEventParams&& e) {
143+
if (!e.eval_result.track_events()) {
144+
return;
145+
}
146+
std::optional<Reason> reason;
147+
148+
// TODO(cwaldren): should also add the reason if the variation
149+
// method was VariationDetail().
150+
if (e.eval_result.track_reason()) {
151+
reason = e.eval_result.detail().reason();
152+
}
153+
154+
client::FeatureEventBase b = {
155+
e.creation_date, std::move(e.key), e.eval_result.version(),
156+
e.eval_result.detail().variation_index(),
157+
e.eval_result.detail().value(), reason,
158+
// TODO(cwaldren): change to actual default; figure out
159+
// where this should be plumbed through.
160+
Value::null()};
161+
162+
auto debug_until_date = e.eval_result.debug_events_until_date();
163+
bool emit_debug_event =
164+
debug_until_date &&
165+
debug_until_date.value() > std::chrono::system_clock::now();
166+
167+
if (emit_debug_event) {
168+
out.emplace_back(
169+
client::DebugEvent{b, filter_.filter(e.context)});
170+
}
171+
// TODO(cwaldren): see about not copying the keys / having the
172+
// getter return a value.
173+
out.emplace_back(client::FeatureEvent{
174+
std::move(b), CopyContextKeys(e.context.kinds_to_keys())});
175+
},
176+
[&](client::IdentifyEventParams&& e) {
177+
// Contexts should already have been checked for
178+
// validity by this point.
179+
assert(e.context.valid());
180+
out.emplace_back(client::IdentifyEvent{
181+
e.creation_date, filter_.filter(e.context)});
182+
},
183+
[&](TrackEventParams&& e) { out.emplace_back(std::move(e)); }},
184+
std::move(event));
185+
186+
return out;
187+
}
37188
} // namespace launchdarkly::events::detail

0 commit comments

Comments
 (0)