Skip to content

Commit 77fa45b

Browse files
authored
CXXCBC-341 Support for subdoc read from replica (#436)
1 parent 1c5c106 commit 77fa45b

25 files changed

+2250
-0
lines changed

CMakeLists.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ set(couchbase_cxx_client_FILES
233233
core/protocol/cmd_increment.cxx
234234
core/protocol/cmd_insert.cxx
235235
core/protocol/cmd_lookup_in.cxx
236+
core/protocol/cmd_lookup_in_replica.cxx
236237
core/protocol/cmd_mutate_in.cxx
237238
core/protocol/cmd_noop.cxx
238239
core/protocol/cmd_observe_seqno.cxx
@@ -311,6 +312,9 @@ set(couchbase_cxx_client_FILES
311312
core/impl/key_value_error_category.cxx
312313
core/impl/key_value_error_context.cxx
313314
core/impl/lookup_in.cxx
315+
core/impl/lookup_in_replica.cxx
316+
core/impl/lookup_in_all_replicas.cxx
317+
core/impl/lookup_in_any_replica.cxx
314318
core/impl/management_error_category.cxx
315319
core/impl/manager_error_context.cxx
316320
core/impl/match_all_query.cxx

core/impl/lookup_in_all_replicas.cxx

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2020-Present Couchbase, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#include "lookup_in_all_replicas.hxx"
19+
#include "lookup_in_replica.hxx"
20+
21+
#include "core/cluster.hxx"
22+
#include "core/error_context/key_value.hxx"
23+
#include "core/operations/document_lookup_in.hxx"
24+
#include "core/topology/configuration.hxx"
25+
26+
namespace couchbase::core::impl
27+
{
28+
29+
void
30+
initiate_lookup_in_all_replicas_operation(std::shared_ptr<cluster> core,
31+
const std::string& bucket_name,
32+
const std::string& scope_name,
33+
const std::string& collection_name,
34+
std::string document_key,
35+
const std::vector<subdoc::command>& specs,
36+
lookup_in_all_replicas_options::built options,
37+
lookup_in_all_replicas_handler&& handler)
38+
{
39+
return initiate_lookup_in_all_replicas_operation(std::move(core),
40+
bucket_name,
41+
scope_name,
42+
collection_name,
43+
std::move(document_key),
44+
specs,
45+
options.timeout,
46+
movable_lookup_in_all_replicas_handler{ std::move(handler) });
47+
}
48+
49+
void
50+
initiate_lookup_in_all_replicas_operation(std::shared_ptr<cluster> core,
51+
const std::string& bucket_name,
52+
const std::string& scope_name,
53+
const std::string& collection_name,
54+
std::string document_key,
55+
const std::vector<subdoc::command>& specs,
56+
std::optional<std::chrono::milliseconds> timeout,
57+
movable_lookup_in_all_replicas_handler&& handler)
58+
{
59+
auto request = std::make_shared<couchbase::core::impl::lookup_in_all_replicas_request>(
60+
bucket_name, scope_name, collection_name, std::move(document_key), specs, timeout);
61+
core->with_bucket_configuration(
62+
bucket_name,
63+
[core, r = std::move(request), h = std::move(handler)](std::error_code ec, const core::topology::configuration& config) mutable {
64+
if (!config.supports_subdoc_read_replica()) {
65+
ec = errc::common::feature_not_available;
66+
}
67+
68+
if (ec) {
69+
std::optional<std::string> first_error_path{};
70+
std::optional<std::size_t> first_error_index{};
71+
return h(
72+
make_subdocument_error_context(make_key_value_error_context(ec, r->id()), ec, first_error_path, first_error_index, false),
73+
lookup_in_all_replicas_result{});
74+
}
75+
struct replica_context {
76+
replica_context(movable_lookup_in_all_replicas_handler handler, std::uint32_t expected_responses)
77+
: handler_(std::move(handler))
78+
, expected_responses_(expected_responses)
79+
{
80+
}
81+
82+
movable_lookup_in_all_replicas_handler handler_;
83+
std::uint32_t expected_responses_;
84+
bool done_{ false };
85+
std::mutex mutex_{};
86+
lookup_in_all_replicas_result result_{};
87+
};
88+
auto ctx = std::make_shared<replica_context>(std::move(h), config.num_replicas.value_or(0U) + 1U);
89+
90+
for (std::size_t idx = 1U; idx <= config.num_replicas.value_or(0U); ++idx) {
91+
document_id replica_id{ r->id() };
92+
replica_id.node_index(idx);
93+
core->execute(
94+
impl::lookup_in_replica_request{ std::move(replica_id), r->specs(), r->timeout() },
95+
[ctx](impl::lookup_in_replica_response&& resp) {
96+
movable_lookup_in_all_replicas_handler local_handler{};
97+
{
98+
std::scoped_lock lock(ctx->mutex_);
99+
if (ctx->done_) {
100+
return;
101+
}
102+
--ctx->expected_responses_;
103+
if (resp.ctx.ec()) {
104+
if (ctx->expected_responses_ > 0) {
105+
// just ignore the response
106+
return;
107+
}
108+
} else {
109+
std::vector<lookup_in_replica_result::entry> entries{};
110+
for (auto& field : resp.fields) {
111+
lookup_in_replica_result::entry lookup_in_entry{};
112+
lookup_in_entry.path = field.path;
113+
lookup_in_entry.value = field.value;
114+
lookup_in_entry.exists = field.exists;
115+
lookup_in_entry.original_index = field.original_index;
116+
entries.emplace_back(lookup_in_entry);
117+
}
118+
ctx->result_.emplace_back(lookup_in_replica_result{ resp.cas, entries, resp.deleted, true /* replica */ });
119+
}
120+
if (ctx->expected_responses_ == 0) {
121+
ctx->done_ = true;
122+
std::swap(local_handler, ctx->handler_);
123+
}
124+
}
125+
if (local_handler) {
126+
if (!ctx->result_.empty()) {
127+
resp.ctx.override_ec({});
128+
}
129+
return local_handler(std::move(resp.ctx), std::move(ctx->result_));
130+
}
131+
});
132+
}
133+
134+
core::operations::lookup_in_request active{ document_id{ r->id() } };
135+
active.specs = r->specs();
136+
active.timeout = r->timeout();
137+
core->execute(active, [ctx](core::operations::lookup_in_response&& resp) {
138+
movable_lookup_in_all_replicas_handler local_handler{};
139+
{
140+
std::scoped_lock lock(ctx->mutex_);
141+
if (ctx->done_) {
142+
return;
143+
}
144+
--ctx->expected_responses_;
145+
if (resp.ctx.ec()) {
146+
if (ctx->expected_responses_ > 0) {
147+
// just ignore the response
148+
return;
149+
}
150+
} else {
151+
std::vector<lookup_in_replica_result::entry> entries{};
152+
for (auto& field : resp.fields) {
153+
lookup_in_replica_result::entry lookup_in_entry{};
154+
lookup_in_entry.path = field.path;
155+
lookup_in_entry.value = field.value;
156+
lookup_in_entry.exists = field.exists;
157+
lookup_in_entry.original_index = field.original_index;
158+
entries.emplace_back(lookup_in_entry);
159+
}
160+
ctx->result_.emplace_back(lookup_in_replica_result{ resp.cas, entries, resp.deleted, false /* active */ });
161+
}
162+
if (ctx->expected_responses_ == 0) {
163+
ctx->done_ = true;
164+
std::swap(local_handler, ctx->handler_);
165+
}
166+
}
167+
if (local_handler) {
168+
if (!ctx->result_.empty()) {
169+
resp.ctx.override_ec({});
170+
}
171+
return local_handler(std::move(resp.ctx), std::move(ctx->result_));
172+
}
173+
});
174+
});
175+
}
176+
} // namespace couchbase::core::impl

core/impl/lookup_in_all_replicas.hxx

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/* -*- Mode: C++; tab-width: 4; c-basic-offset: 4; indent-tabs-mode: nil -*- */
2+
/*
3+
* Copyright 2020-Present Couchbase, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#pragma once
19+
20+
#include <couchbase/lookup_in_all_replicas_options.hxx>
21+
#include <couchbase/lookup_in_replica_result.hxx>
22+
23+
#include "core/document_id.hxx"
24+
#include "core/error_context/key_value.hxx"
25+
#include "core/utils/movable_function.hxx"
26+
27+
#include <vector>
28+
29+
namespace couchbase::core::impl
30+
{
31+
32+
class lookup_in_all_replicas_request
33+
{
34+
public:
35+
explicit lookup_in_all_replicas_request(std::string bucket_name,
36+
std::string scope_name,
37+
std::string collection_name,
38+
std::string document_key,
39+
std::vector<couchbase::core::impl::subdoc::command> specs,
40+
std::optional<std::chrono::milliseconds> timeout)
41+
: id_{ std::move(bucket_name), std::move(scope_name), std::move(collection_name), std::move(document_key) }
42+
, specs_{ std::move(specs) }
43+
, timeout_{ timeout }
44+
{
45+
}
46+
47+
[[nodiscard]] const auto& id() const
48+
{
49+
return id_;
50+
}
51+
52+
[[nodiscard]] const auto& specs() const
53+
{
54+
return specs_;
55+
}
56+
57+
[[nodiscard]] const auto& timeout() const
58+
{
59+
return timeout_;
60+
}
61+
62+
private:
63+
core::document_id id_;
64+
std::vector<couchbase::core::impl::subdoc::command> specs_;
65+
std::optional<std::chrono::milliseconds> timeout_{};
66+
};
67+
68+
using movable_lookup_in_all_replicas_handler =
69+
utils::movable_function<void(couchbase::subdocument_error_context, lookup_in_all_replicas_result)>;
70+
71+
void
72+
initiate_lookup_in_all_replicas_operation(std::shared_ptr<cluster> core,
73+
const std::string& bucket_name,
74+
const std::string& scope_name,
75+
const std::string& collection_name,
76+
std::string document_key,
77+
const std::vector<subdoc::command>& specs,
78+
std::optional<std::chrono::milliseconds> timeout,
79+
movable_lookup_in_all_replicas_handler&& handler);
80+
} // namespace couchbase::core::impl

0 commit comments

Comments
 (0)