Skip to content

RCBC-487: Implement zone aware replica reads #163

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ext/rcb_backend.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,11 @@ initialize_cluster_options(const core::utils::connection_string& connstr,
cluster_options.network().preferred_network(param.value());
}

static const auto server_group = rb_id2sym(rb_intern("preferred_server_group"));
if (auto group = options::get_string(options, server_group); group) {
cluster_options.network().preferred_server_group(group.value());
}

static const auto sym_use_ip_protocol = rb_id2sym(rb_intern("use_ip_protocol"));
if (auto proto = options::get_symbol(options, sym_use_ip_protocol); proto) {
static const auto sym_any = rb_id2sym(rb_intern("any"));
Expand Down
81 changes: 51 additions & 30 deletions ext/rcb_crud.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -128,31 +128,40 @@ cb_Backend_document_get_any_replica(VALUE self,
VALUE id,
VALUE options)
{
auto cluster = cb_backend_to_public_api_cluster(self);
auto cluster = cb_backend_to_core_api_cluster(self);

Check_Type(bucket, T_STRING);
Check_Type(scope, T_STRING);
Check_Type(collection, T_STRING);
Check_Type(id, T_STRING);

try {
couchbase::get_any_replica_options opts;
set_timeout(opts, options);
core::document_id doc_id{
cb_string_new(bucket),
cb_string_new(scope),
cb_string_new(collection),
cb_string_new(id),
};

auto f = cluster.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
.get_any_replica(cb_string_new(id), opts);
auto [ctx, resp] = cb_wait_for_future(f);
if (ctx.ec()) {
cb_throw_error(ctx, "unable to get replica of the document");
core::operations::get_any_replica_request req{ doc_id };
cb_extract_timeout(req, options);
cb_extract_read_preference(req, options);

std::promise<core::operations::get_any_replica_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
auto resp = cb_wait_for_future(f);
if (resp.ctx.ec()) {
cb_throw_error(resp.ctx, "unable to get replica of the document");
}

auto value = resp.content_as<passthrough_transcoder>();
VALUE res = rb_hash_new();
rb_hash_aset(res, rb_id2sym(rb_intern("content")), cb_str_new(value.data));
rb_hash_aset(res, rb_id2sym(rb_intern("cas")), cb_cas_to_num(resp.cas()));
rb_hash_aset(res, rb_id2sym(rb_intern("flags")), UINT2NUM(value.flags));
rb_hash_aset(res, rb_id2sym(rb_intern("content")), cb_str_new(resp.value));
rb_hash_aset(res, rb_id2sym(rb_intern("cas")), cb_cas_to_num(resp.cas));
rb_hash_aset(res, rb_id2sym(rb_intern("flags")), UINT2NUM(resp.flags));
rb_hash_aset(res, rb_id2sym(rb_intern("replica")), resp.replica ? Qtrue : Qfalse);
return res;
} catch (const std::system_error& se) {
rb_exc_raise(cb_map_error_code(
Expand All @@ -171,33 +180,43 @@ cb_Backend_document_get_all_replicas(VALUE self,
VALUE id,
VALUE options)
{
auto cluster = cb_backend_to_public_api_cluster(self);
auto cluster = cb_backend_to_core_api_cluster(self);

Check_Type(bucket, T_STRING);
Check_Type(scope, T_STRING);
Check_Type(collection, T_STRING);
Check_Type(id, T_STRING);

try {
couchbase::get_all_replicas_options opts;
set_timeout(opts, options);
core::document_id doc_id{
cb_string_new(bucket),
cb_string_new(scope),
cb_string_new(collection),
cb_string_new(id),
};

auto f = cluster.bucket(cb_string_new(bucket))
.scope(cb_string_new(scope))
.collection(cb_string_new(collection))
.get_all_replicas(cb_string_new(id), opts);
auto [ctx, resp] = cb_wait_for_future(f);
if (ctx.ec()) {
cb_throw_error(ctx, "unable to get all replicas for the document");
core::operations::get_all_replicas_request req{ doc_id };
cb_extract_timeout(req, options);
cb_extract_read_preference(req, options);

std::promise<core::operations::get_all_replicas_response> promise;
auto f = promise.get_future();
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
promise.set_value(std::forward<decltype(resp)>(resp));
});
auto resp = cb_wait_for_future(f);
if (resp.ctx.ec()) {
cb_throw_error(resp.ctx, "unable to get all replicas for the document");
}

VALUE res = rb_ary_new_capa(static_cast<long>(resp.size()));
for (const auto& entry : resp) {
VALUE res = rb_ary_new_capa(static_cast<long>(resp.entries.size()));

for (const auto& entry : resp.entries) {
VALUE response = rb_hash_new();
auto value = entry.content_as<passthrough_transcoder>();
rb_hash_aset(response, rb_id2sym(rb_intern("content")), cb_str_new(value.data));
rb_hash_aset(response, rb_id2sym(rb_intern("cas")), cb_cas_to_num(entry.cas()));
rb_hash_aset(response, rb_id2sym(rb_intern("flags")), UINT2NUM(value.flags));
rb_hash_aset(response, rb_id2sym(rb_intern("content")), cb_str_new(entry.value));
rb_hash_aset(response, rb_id2sym(rb_intern("cas")), cb_cas_to_num(entry.cas));
rb_hash_aset(response, rb_id2sym(rb_intern("flags")), UINT2NUM(entry.flags));
rb_hash_aset(response, rb_id2sym(rb_intern("replica")), entry.replica ? Qtrue : Qfalse);
rb_ary_push(res, response);
}
return res;
Expand Down Expand Up @@ -1107,6 +1126,7 @@ cb_Backend_document_lookup_in_any_replica(VALUE self,

core::operations::lookup_in_any_replica_request req{ doc_id };
cb_extract_timeout(req, options);
cb_extract_read_preference(req, options);

static VALUE xattr_property = rb_id2sym(rb_intern("xattr"));
static VALUE path_property = rb_id2sym(rb_intern("path"));
Expand Down Expand Up @@ -1234,6 +1254,7 @@ cb_Backend_document_lookup_in_all_replicas(VALUE self,

core::operations::lookup_in_all_replicas_request req{ doc_id };
cb_extract_timeout(req, options);
cb_extract_read_preference(req, options);

static VALUE xattr_property = rb_id2sym(rb_intern("xattr"));
static VALUE path_property = rb_id2sym(rb_intern("path"));
Expand Down
32 changes: 32 additions & 0 deletions ext/rcb_utils.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <couchbase/cas.hxx>
#include <couchbase/durability_level.hxx>
#include <couchbase/persist_to.hxx>
#include <couchbase/read_preference.hxx>
#include <couchbase/replicate_to.hxx>
#include <couchbase/store_semantics.hxx>

Expand Down Expand Up @@ -128,6 +129,37 @@ cb_extract_timeout(Request& req, VALUE options)
}
}

template<typename Request>
inline void
cb_extract_read_preference(Request& req, VALUE options)
{
static VALUE property_name = rb_id2sym(rb_intern("read_preference"));
if (!NIL_P(options)) {
if (TYPE(options) != T_HASH) {
throw ruby_exception(rb_eArgError,
rb_sprintf("expected options to be Hash, given %+" PRIsVALUE, options));
}

VALUE val = rb_hash_aref(options, property_name);
if (NIL_P(val)) {
return;
}
if (TYPE(val) != T_SYMBOL) {
throw ruby_exception(
rb_eArgError, rb_sprintf("read_preference must be a Symbol, but given %+" PRIsVALUE, val));
}

if (ID mode = rb_sym2id(val); mode == rb_intern("no_preference")) {
req.read_preference = couchbase::read_preference::no_preference;
} else if (mode == rb_intern("selected_server_group")) {
req.read_preference = couchbase::read_preference::selected_server_group;
} else {
throw ruby_exception(rb_eArgError,
rb_sprintf("unexpected read_preference, given %+" PRIsVALUE, val));
}
}
}

template<typename Field>
inline void
cb_extract_duration(Field& field, VALUE options, const char* name)
Expand Down
Loading
Loading