Skip to content

Commit 24d2c3c

Browse files
Matt-Wozavsej
authored andcommitted
Implement zone aware replica reads
1 parent 160fd14 commit 24d2c3c

File tree

7 files changed

+187
-30
lines changed

7 files changed

+187
-30
lines changed

ext/rcb_backend.cxx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,11 @@ initialize_cluster_options(const core::utils::connection_string& connstr,
208208
cluster_options.network().preferred_network(param.value());
209209
}
210210

211+
static const auto server_group = rb_id2sym(rb_intern("preferred_server_group"));
212+
if (auto group = options::get_string(options, server_group); group) {
213+
cluster_options.network().preferred_server_group(group.value());
214+
}
215+
211216
static const auto sym_use_ip_protocol = rb_id2sym(rb_intern("use_ip_protocol"));
212217
if (auto proto = options::get_symbol(options, sym_use_ip_protocol); proto) {
213218
static const auto sym_any = rb_id2sym(rb_intern("any"));

ext/rcb_crud.cxx

Lines changed: 51 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -128,31 +128,40 @@ cb_Backend_document_get_any_replica(VALUE self,
128128
VALUE id,
129129
VALUE options)
130130
{
131-
auto cluster = cb_backend_to_public_api_cluster(self);
131+
auto cluster = cb_backend_to_core_api_cluster(self);
132132

133133
Check_Type(bucket, T_STRING);
134134
Check_Type(scope, T_STRING);
135135
Check_Type(collection, T_STRING);
136136
Check_Type(id, T_STRING);
137137

138138
try {
139-
couchbase::get_any_replica_options opts;
140-
set_timeout(opts, options);
139+
core::document_id doc_id{
140+
cb_string_new(bucket),
141+
cb_string_new(scope),
142+
cb_string_new(collection),
143+
cb_string_new(id),
144+
};
141145

142-
auto f = cluster.bucket(cb_string_new(bucket))
143-
.scope(cb_string_new(scope))
144-
.collection(cb_string_new(collection))
145-
.get_any_replica(cb_string_new(id), opts);
146-
auto [ctx, resp] = cb_wait_for_future(f);
147-
if (ctx.ec()) {
148-
cb_throw_error(ctx, "unable to get replica of the document");
146+
core::operations::get_any_replica_request req{ doc_id };
147+
cb_extract_timeout(req, options);
148+
cb_extract_read_preference(req, options);
149+
150+
std::promise<core::operations::get_any_replica_response> promise;
151+
auto f = promise.get_future();
152+
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
153+
promise.set_value(std::forward<decltype(resp)>(resp));
154+
});
155+
auto resp = cb_wait_for_future(f);
156+
if (resp.ctx.ec()) {
157+
cb_throw_error(resp.ctx, "unable to get replica of the document");
149158
}
150159

151-
auto value = resp.content_as<passthrough_transcoder>();
152160
VALUE res = rb_hash_new();
153-
rb_hash_aset(res, rb_id2sym(rb_intern("content")), cb_str_new(value.data));
154-
rb_hash_aset(res, rb_id2sym(rb_intern("cas")), cb_cas_to_num(resp.cas()));
155-
rb_hash_aset(res, rb_id2sym(rb_intern("flags")), UINT2NUM(value.flags));
161+
rb_hash_aset(res, rb_id2sym(rb_intern("content")), cb_str_new(resp.value));
162+
rb_hash_aset(res, rb_id2sym(rb_intern("cas")), cb_cas_to_num(resp.cas));
163+
rb_hash_aset(res, rb_id2sym(rb_intern("flags")), UINT2NUM(resp.flags));
164+
rb_hash_aset(res, rb_id2sym(rb_intern("replica")), resp.replica ? Qtrue : Qfalse);
156165
return res;
157166
} catch (const std::system_error& se) {
158167
rb_exc_raise(cb_map_error_code(
@@ -171,33 +180,43 @@ cb_Backend_document_get_all_replicas(VALUE self,
171180
VALUE id,
172181
VALUE options)
173182
{
174-
auto cluster = cb_backend_to_public_api_cluster(self);
183+
auto cluster = cb_backend_to_core_api_cluster(self);
175184

176185
Check_Type(bucket, T_STRING);
177186
Check_Type(scope, T_STRING);
178187
Check_Type(collection, T_STRING);
179188
Check_Type(id, T_STRING);
180189

181190
try {
182-
couchbase::get_all_replicas_options opts;
183-
set_timeout(opts, options);
191+
core::document_id doc_id{
192+
cb_string_new(bucket),
193+
cb_string_new(scope),
194+
cb_string_new(collection),
195+
cb_string_new(id),
196+
};
184197

185-
auto f = cluster.bucket(cb_string_new(bucket))
186-
.scope(cb_string_new(scope))
187-
.collection(cb_string_new(collection))
188-
.get_all_replicas(cb_string_new(id), opts);
189-
auto [ctx, resp] = cb_wait_for_future(f);
190-
if (ctx.ec()) {
191-
cb_throw_error(ctx, "unable to get all replicas for the document");
198+
core::operations::get_all_replicas_request req{ doc_id };
199+
cb_extract_timeout(req, options);
200+
cb_extract_read_preference(req, options);
201+
202+
std::promise<core::operations::get_all_replicas_response> promise;
203+
auto f = promise.get_future();
204+
cluster.execute(req, [promise = std::move(promise)](auto&& resp) mutable {
205+
promise.set_value(std::forward<decltype(resp)>(resp));
206+
});
207+
auto resp = cb_wait_for_future(f);
208+
if (resp.ctx.ec()) {
209+
cb_throw_error(resp.ctx, "unable to get all replicas for the document");
192210
}
193211

194-
VALUE res = rb_ary_new_capa(static_cast<long>(resp.size()));
195-
for (const auto& entry : resp) {
212+
VALUE res = rb_ary_new_capa(static_cast<long>(resp.entries.size()));
213+
214+
for (const auto& entry : resp.entries) {
196215
VALUE response = rb_hash_new();
197-
auto value = entry.content_as<passthrough_transcoder>();
198-
rb_hash_aset(response, rb_id2sym(rb_intern("content")), cb_str_new(value.data));
199-
rb_hash_aset(response, rb_id2sym(rb_intern("cas")), cb_cas_to_num(entry.cas()));
200-
rb_hash_aset(response, rb_id2sym(rb_intern("flags")), UINT2NUM(value.flags));
216+
rb_hash_aset(response, rb_id2sym(rb_intern("content")), cb_str_new(entry.value));
217+
rb_hash_aset(response, rb_id2sym(rb_intern("cas")), cb_cas_to_num(entry.cas));
218+
rb_hash_aset(response, rb_id2sym(rb_intern("flags")), UINT2NUM(entry.flags));
219+
rb_hash_aset(response, rb_id2sym(rb_intern("replica")), entry.replica ? Qtrue : Qfalse);
201220
rb_ary_push(res, response);
202221
}
203222
return res;
@@ -1107,6 +1126,7 @@ cb_Backend_document_lookup_in_any_replica(VALUE self,
11071126

11081127
core::operations::lookup_in_any_replica_request req{ doc_id };
11091128
cb_extract_timeout(req, options);
1129+
cb_extract_read_preference(req, options);
11101130

11111131
static VALUE xattr_property = rb_id2sym(rb_intern("xattr"));
11121132
static VALUE path_property = rb_id2sym(rb_intern("path"));
@@ -1234,6 +1254,7 @@ cb_Backend_document_lookup_in_all_replicas(VALUE self,
12341254

12351255
core::operations::lookup_in_all_replicas_request req{ doc_id };
12361256
cb_extract_timeout(req, options);
1257+
cb_extract_read_preference(req, options);
12371258

12381259
static VALUE xattr_property = rb_id2sym(rb_intern("xattr"));
12391260
static VALUE path_property = rb_id2sym(rb_intern("path"));

ext/rcb_utils.hxx

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <couchbase/cas.hxx>
2222
#include <couchbase/durability_level.hxx>
2323
#include <couchbase/persist_to.hxx>
24+
#include <couchbase/read_preference.hxx>
2425
#include <couchbase/replicate_to.hxx>
2526
#include <couchbase/store_semantics.hxx>
2627

@@ -128,6 +129,37 @@ cb_extract_timeout(Request& req, VALUE options)
128129
}
129130
}
130131

132+
template<typename Request>
133+
inline void
134+
cb_extract_read_preference(Request& req, VALUE options)
135+
{
136+
static VALUE property_name = rb_id2sym(rb_intern("read_preference"));
137+
if (!NIL_P(options)) {
138+
if (TYPE(options) != T_HASH) {
139+
throw ruby_exception(rb_eArgError,
140+
rb_sprintf("expected options to be Hash, given %+" PRIsVALUE, options));
141+
}
142+
143+
VALUE val = rb_hash_aref(options, property_name);
144+
if (NIL_P(val)) {
145+
return;
146+
}
147+
if (TYPE(val) != T_SYMBOL) {
148+
throw ruby_exception(
149+
rb_eArgError, rb_sprintf("read_preference must be a Symbol, but given %+" PRIsVALUE, val));
150+
}
151+
152+
if (ID mode = rb_sym2id(val); mode == rb_intern("no_preference")) {
153+
req.read_preference = couchbase::read_preference::no_preference;
154+
} else if (mode == rb_intern("selected_server_group")) {
155+
req.read_preference = couchbase::read_preference::selected_server_group;
156+
} else {
157+
throw ruby_exception(rb_eArgError,
158+
rb_sprintf("unexpected read_preference, given %+" PRIsVALUE, val));
159+
}
160+
}
161+
}
162+
131163
template<typename Field>
132164
inline void
133165
cb_extract_duration(Field& field, VALUE options, const char* name)

0 commit comments

Comments
 (0)