Skip to content

Commit 1b400ff

Browse files
authored
gRPC: add more unit tests to Datastore and Stream (#1943)
1 parent 24b147d commit 1b400ff

16 files changed

+367
-80
lines changed

Firestore/core/src/firebase/firestore/remote/datastore.h

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,11 @@ class Datastore : public std::enable_shared_from_this<Datastore> {
7272
auth::CredentialsProvider* credentials,
7373
FSTSerializerBeta* serializer);
7474

75+
virtual ~Datastore() {
76+
}
77+
78+
/** Starts polling the gRPC completion queue. */
79+
void Start();
7580
/** Cancels any pending gRPC calls and drains the gRPC completion queue. */
7681
void Shutdown();
7782

@@ -101,6 +106,16 @@ class Datastore : public std::enable_shared_from_this<Datastore> {
101106
Datastore& operator=(const Datastore& other) = delete;
102107
Datastore& operator=(Datastore&& other) = delete;
103108

109+
protected:
110+
/** Test-only method */
111+
grpc::CompletionQueue* grpc_queue() {
112+
return &grpc_queue_;
113+
}
114+
/** Test-only method */
115+
GrpcCall* LastCall() {
116+
return !active_calls_.empty() ? active_calls_.back().get() : nullptr;
117+
}
118+
104119
private:
105120
void PollGrpcQueue();
106121

Firestore/core/src/firebase/firestore/remote/datastore.mm

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ void LogGrpcCallFinished(absl::string_view rpc_name,
9595
grpc_connection_{database_info, worker_queue, &grpc_queue_,
9696
connectivity_monitor_.get()},
9797
serializer_bridge_{NOT_NULL(serializer)} {
98+
}
99+
100+
void Datastore::Start() {
98101
rpc_executor_->Execute([this] { PollGrpcQueue(); });
99102
}
100103

Firestore/core/src/firebase/firestore/remote/grpc_call.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include <map>
2121

2222
#include "Firestore/core/src/firebase/firestore/util/status.h"
23+
#include "grpcpp/client_context.h"
2324
#include "grpcpp/support/string_ref.h"
2425

2526
namespace firebase {
@@ -55,6 +56,9 @@ class GrpcCall {
5556

5657
/** Finishes the call with an error, notifying any callbacks and observers. */
5758
virtual void FinishAndNotify(const util::Status& status) = 0;
59+
60+
/** For tests only */
61+
virtual grpc::ClientContext* context() = 0;
5862
};
5963

6064
} // namespace remote

Firestore/core/src/firebase/firestore/remote/grpc_stream.cc

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ GrpcStream::GrpcStream(
9898
GrpcStream::~GrpcStream() {
9999
HARD_ASSERT(completions_.empty(),
100100
"GrpcStream is being destroyed without proper shutdown");
101+
MaybeUnregister();
101102
}
102103

103104
void GrpcStream::Start() {
@@ -172,11 +173,7 @@ void GrpcStream::FinishAndNotify(const Status& status) {
172173
}
173174

174175
void GrpcStream::Shutdown() {
175-
if (grpc_connection_) {
176-
grpc_connection_->Unregister(this);
177-
grpc_connection_ = nullptr;
178-
}
179-
176+
MaybeUnregister();
180177
if (completions_.empty()) {
181178
// Nothing to cancel.
182179
return;
@@ -202,6 +199,13 @@ void GrpcStream::Shutdown() {
202199
FastFinishCompletionsBlocking();
203200
}
204201

202+
void GrpcStream::MaybeUnregister() {
203+
if (grpc_connection_) {
204+
grpc_connection_->Unregister(this);
205+
grpc_connection_ = nullptr;
206+
}
207+
}
208+
205209
void GrpcStream::FastFinishCompletionsBlocking() {
206210
// TODO(varconst): reset buffered_writer_? Should not be necessary, because it
207211
// should never be called again after a call to Finish.

Firestore/core/src/firebase/firestore/remote/grpc_stream.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ class GrpcStream : public GrpcCall {
173173
Metadata GetResponseHeaders() const override;
174174

175175
/** For tests only */
176-
grpc::ClientContext* context() {
176+
grpc::ClientContext* context() override {
177177
return context_.get();
178178
}
179179

@@ -185,6 +185,7 @@ class GrpcStream : public GrpcCall {
185185
void UnsetObserver() {
186186
observer_ = nullptr;
187187
}
188+
void MaybeUnregister();
188189

189190
void OnStart();
190191
void OnRead(const grpc::ByteBuffer& message);

Firestore/core/src/firebase/firestore/remote/grpc_streaming_reader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class GrpcStreamingReader : public GrpcCall, public GrpcStreamObserver {
8888
void OnStreamFinish(const util::Status& status) override;
8989

9090
/** For tests only */
91-
grpc::ClientContext* context() {
91+
grpc::ClientContext* context() override {
9292
return stream_->context();
9393
}
9494

Firestore/core/src/firebase/firestore/remote/grpc_unary_call.cc

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ GrpcUnaryCall::GrpcUnaryCall(
4646
GrpcUnaryCall::~GrpcUnaryCall() {
4747
HARD_ASSERT(!finish_completion_,
4848
"GrpcUnaryCall is being destroyed without proper shutdown");
49+
MaybeUnregister();
4950
}
5051

5152
void GrpcUnaryCall::Start(Callback&& callback) {
@@ -58,6 +59,8 @@ void GrpcUnaryCall::Start(Callback&& callback) {
5859
[this](bool /*ignored_ok*/, const GrpcCompletion* completion) {
5960
// Ignoring ok, status should contain all the relevant information.
6061
finish_completion_ = nullptr;
62+
Shutdown();
63+
6164
auto callback = std::move(callback_);
6265
if (completion->status()->ok()) {
6366
callback(*completion->message());
@@ -84,11 +87,7 @@ void GrpcUnaryCall::FinishAndNotify(const util::Status& status) {
8487
}
8588

8689
void GrpcUnaryCall::Shutdown() {
87-
if (grpc_connection_) {
88-
grpc_connection_->Unregister(this);
89-
grpc_connection_ = nullptr;
90-
}
91-
90+
MaybeUnregister();
9291
if (!finish_completion_) {
9392
// Nothing to cancel.
9493
return;
@@ -102,6 +101,13 @@ void GrpcUnaryCall::Shutdown() {
102101
finish_completion_ = nullptr;
103102
}
104103

104+
void GrpcUnaryCall::MaybeUnregister() {
105+
if (grpc_connection_) {
106+
grpc_connection_->Unregister(this);
107+
grpc_connection_ = nullptr;
108+
}
109+
}
110+
105111
GrpcCall::Metadata GrpcUnaryCall::GetResponseHeaders() const {
106112
return context_->GetServerInitialMetadata();
107113
}

Firestore/core/src/firebase/firestore/remote/grpc_unary_call.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,13 @@ class GrpcUnaryCall : public GrpcCall {
8282
Metadata GetResponseHeaders() const override;
8383

8484
/** For tests only */
85-
grpc::ClientContext* context() {
85+
grpc::ClientContext* context() override {
8686
return context_.get();
8787
}
8888

8989
private:
9090
void Shutdown();
91+
void MaybeUnregister();
9192

9293
// See comments in `GrpcStream` on lifetime issues for gRPC objects.
9394
std::unique_ptr<grpc::ClientContext> context_;

Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ class DatastoreSerializer {
161161
NSArray<FSTMaybeDocument*>* MergeLookupResponses(
162162
const std::vector<grpc::ByteBuffer>& responses,
163163
util::Status* out_status) const;
164-
FSTMaybeDocument* ToMaybeDocument(GCFSBatchGetDocumentsResponse*) const;
164+
FSTMaybeDocument* ToMaybeDocument(
165+
GCFSBatchGetDocumentsResponse* response) const;
165166

166167
FSTSerializerBeta* GetSerializer() {
167168
return serializer_;

Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.mm

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,10 +60,15 @@
6060
return output.str();
6161
}
6262

63-
NSData* ConvertToNsData(const grpc::ByteBuffer& buffer) {
63+
NSData* ConvertToNsData(const grpc::ByteBuffer& buffer, NSError** out_error) {
6464
std::vector<grpc::Slice> slices;
6565
grpc::Status status = buffer.Dump(&slices);
66-
HARD_ASSERT(status.ok(), "Trying to convert an invalid grpc::ByteBuffer");
66+
if (!status.ok()) {
67+
*out_error =
68+
MakeNSError(Status{FirestoreErrorCode::Internal,
69+
"Trying to convert an invalid grpc::ByteBuffer"});
70+
return nil;
71+
}
6772

6873
if (slices.size() == 1) {
6974
return [NSData dataWithBytes:slices.front().begin()
@@ -86,10 +91,13 @@
8691
template <typename Proto>
8792
Proto* ToProto(const grpc::ByteBuffer& message, Status* out_status) {
8893
NSError* error = nil;
89-
Proto* proto = [Proto parseFromData:ConvertToNsData(message) error:&error];
94+
NSData* data = ConvertToNsData(message, &error);
9095
if (!error) {
91-
*out_status = Status::OK();
92-
return proto;
96+
Proto* proto = [Proto parseFromData:data error:&error];
97+
if (!error) {
98+
*out_status = Status::OK();
99+
return proto;
100+
}
93101
}
94102

95103
std::string error_description = StringFormat(

0 commit comments

Comments
 (0)