Skip to content

gRPC: add more unit tests for Stream and Datastore #1935

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Oct 14, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Firestore/Example/Firestore.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@
ABC1D7E42024AFDE00BA84F0 /* firebase_credentials_provider_test.mm in Sources */ = {isa = PBXBuildFile; fileRef = ABC1D7E22023CDC500BA84F0 /* firebase_credentials_provider_test.mm */; };
ABE6637A201FA81900ED349A /* database_id_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = AB71064B201FA60300344F18 /* database_id_test.cc */; };
ABF6506C201131F8005F2C74 /* timestamp_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = ABF6506B201131F8005F2C74 /* timestamp_test.cc */; };
B60894F72170207200EBC644 /* fake_credentials_provider.cc in Sources */ = {isa = PBXBuildFile; fileRef = B60894F62170207100EBC644 /* fake_credentials_provider.cc */; };
B6152AD7202A53CB000E5744 /* document_key_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B6152AD5202A5385000E5744 /* document_key_test.cc */; };
B65D34A9203C995B0076A5E1 /* FIRTimestampTest.m in Sources */ = {isa = PBXBuildFile; fileRef = B65D34A7203C99090076A5E1 /* FIRTimestampTest.m */; };
B66D8996213609EE0086DA0C /* stream_test.mm in Sources */ = {isa = PBXBuildFile; fileRef = B66D8995213609EE0086DA0C /* stream_test.mm */; };
Expand Down Expand Up @@ -515,6 +516,8 @@
ABF6506B201131F8005F2C74 /* timestamp_test.cc */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = timestamp_test.cc; sourceTree = "<group>"; };
B1A7E1959AF8141FA7E6B888 /* grpc_stream_tester.cc */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.cpp.cpp; path = grpc_stream_tester.cc; sourceTree = "<group>"; };
B3F5B3AAE791A5911B9EAA82 /* Pods-Firestore_Tests_iOS.release.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Firestore_Tests_iOS.release.xcconfig"; path = "Pods/Target Support Files/Pods-Firestore_Tests_iOS/Pods-Firestore_Tests_iOS.release.xcconfig"; sourceTree = "<group>"; };
B60894F52170207100EBC644 /* fake_credentials_provider.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = fake_credentials_provider.h; sourceTree = "<group>"; };
B60894F62170207100EBC644 /* fake_credentials_provider.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = fake_credentials_provider.cc; sourceTree = "<group>"; };
B6152AD5202A5385000E5744 /* document_key_test.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = document_key_test.cc; sourceTree = "<group>"; };
B65D34A7203C99090076A5E1 /* FIRTimestampTest.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = FIRTimestampTest.m; sourceTree = "<group>"; };
B66D8995213609EE0086DA0C /* stream_test.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = stream_test.mm; sourceTree = "<group>"; };
Expand Down Expand Up @@ -676,6 +679,8 @@
54740A561FC913EB00713A1A /* util */ = {
isa = PBXGroup;
children = (
B60894F62170207100EBC644 /* fake_credentials_provider.cc */,
B60894F52170207100EBC644 /* fake_credentials_provider.h */,
B67BF448216EB43000CA9097 /* create_noop_connectivity_monitor.cc */,
B67BF447216EB42F00CA9097 /* create_noop_connectivity_monitor.h */,
B6FB4680208EA0BE00554BA2 /* async_queue_libdispatch_test.mm */,
Expand Down Expand Up @@ -1829,6 +1834,7 @@
5467FB01203E5717009C9584 /* FIRFirestoreTests.mm in Sources */,
5492E052202154AB00B64F25 /* FIRGeoPointTests.mm in Sources */,
5492E059202154AB00B64F25 /* FIRQuerySnapshotTests.mm in Sources */,
B60894F72170207200EBC644 /* fake_credentials_provider.cc in Sources */,
5492E051202154AA00B64F25 /* FIRQueryTests.mm in Sources */,
5492E057202154AB00B64F25 /* FIRSnapshotMetadataTests.mm in Sources */,
B65D34A9203C995B0076A5E1 /* FIRTimestampTest.m in Sources */,
Expand Down
4 changes: 4 additions & 0 deletions Firestore/core/src/firebase/firestore/remote/datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ class Datastore : public std::enable_shared_from_this<Datastore> {
static GrpcCall::Metadata ExtractWhitelistedHeaders(
const GrpcCall::Metadata& headers);

// In case Auth tries to invoke a callback after `Datastore` has been shut
// down.
bool is_shut_down_ = false;

util::AsyncQueue* worker_queue_ = nullptr;
auth::CredentialsProvider* credentials_ = nullptr;

Expand Down
7 changes: 7 additions & 0 deletions Firestore/core/src/firebase/firestore/remote/datastore.mm
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ void LogGrpcCallFinished(absl::string_view rpc_name,
}

void Datastore::Shutdown() {
is_shut_down_ = true;

// Order matters here: shutting down `grpc_connection_`, which will quickly
// finish any pending gRPC calls, must happen before shutting down the gRPC
// queue.
Expand Down Expand Up @@ -262,6 +264,11 @@ void LogGrpcCallFinished(absl::string_view rpc_name,
if (!strong_this) {
return;
}
// In case Auth callback is invoked after Datastore has been shut
// down.
if (strong_this->is_shut_down_) {
return;
}

on_credentials(result);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@

void GrpcConnection::Shutdown() {
// Fast finish any pending calls. This will not trigger the observers.
for (GrpcCall* call : active_calls_) {
// Calls may unregister themselves on finish, so make a protective copy.
auto active_calls = active_calls_;
for (GrpcCall* call : active_calls) {
call->FinishImmediately();
}
}
Expand Down Expand Up @@ -161,7 +163,7 @@
void GrpcConnection::RegisterConnectivityMonitor() {
connectivity_monitor_->AddCallback(
[this](ConnectivityMonitor::NetworkStatus /*ignored*/) {
// Calls may unregister themselves on cancel, so make a protective copy.
// Calls may unregister themselves on finish, so make a protective copy.
auto calls = active_calls_;
for (GrpcCall* call : calls) {
// This will trigger the observers.
Expand Down
88 changes: 69 additions & 19 deletions Firestore/core/test/firebase/firestore/remote/datastore_test.mm
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
#include <memory>
#include <string>

#include "Firestore/core/src/firebase/firestore/auth/empty_credentials_provider.h"
#include "Firestore/core/src/firebase/firestore/remote/datastore.h"
#include "Firestore/core/src/firebase/firestore/util/async_queue.h"
#include "Firestore/core/src/firebase/firestore/util/executor_libdispatch.h"
#include "Firestore/core/test/firebase/firestore/util/fake_credentials_provider.h"
#include "absl/memory/memory.h"
#include "gtest/gtest.h"

Expand All @@ -29,10 +29,10 @@
namespace remote {

using auth::CredentialsProvider;
using auth::EmptyCredentialsProvider;
using core::DatabaseInfo;
using model::DatabaseId;
using util::AsyncQueue;
using util::FakeCredentialsProvider;
using util::internal::ExecutorLibdispatch;

namespace {
Expand All @@ -47,11 +47,11 @@ void OnStreamFinish(const util::Status& status) override {
}
};

std::unique_ptr<Datastore> CreateDatastore(const DatabaseInfo& database_info,
AsyncQueue* async_queue,
std::shared_ptr<Datastore> CreateDatastore(const DatabaseInfo& database_info,
AsyncQueue* worker_queue,
CredentialsProvider* credentials) {
return absl::make_unique<Datastore>(
database_info, async_queue, credentials,
return std::make_shared<Datastore>(
database_info, worker_queue, credentials,
[[FSTSerializerBeta alloc]
initWithDatabaseID:&database_info.database_id()]);
}
Expand All @@ -61,32 +61,29 @@ void OnStreamFinish(const util::Status& status) override {
class DatastoreTest : public testing::Test {
public:
DatastoreTest()
: async_queue{absl::make_unique<ExecutorLibdispatch>(
: worker_queue{absl::make_unique<ExecutorLibdispatch>(
dispatch_queue_create("datastore_test", DISPATCH_QUEUE_SERIAL))},
database_info_{DatabaseId{"foo", "bar"}, "", "", false},
datastore{
CreateDatastore(database_info_, &async_queue, &credentials_)} {
database_info{DatabaseId{"foo", "bar"}, "", "", false},
datastore{CreateDatastore(database_info, &worker_queue, &credentials)} {
}

~DatastoreTest() {
if (!is_shut_down_) {
if (!is_shut_down) {
Shutdown();
}
}

void Shutdown() {
is_shut_down_ = true;
is_shut_down = true;
datastore->Shutdown();
}

private:
bool is_shut_down_ = false;
DatabaseInfo database_info_;
EmptyCredentialsProvider credentials_;
bool is_shut_down = false;
DatabaseInfo database_info;
FakeCredentialsProvider credentials;

public:
AsyncQueue async_queue;
std::unique_ptr<Datastore> datastore;
AsyncQueue worker_queue;
std::shared_ptr<Datastore> datastore;
};

TEST_F(DatastoreTest, CanShutdownWithNoOperations) {
Expand All @@ -113,6 +110,59 @@ void Shutdown() {
"x-google-service: service 2\n");
}

TEST_F(DatastoreTest, CommitMutationsAuthFailure) {
credentials.FailGetToken();

__block NSError* resulting_error = nullptr;
datastore->CommitMutations(@[], ^(NSError* _Nullable error) {
resulting_error = error;
});
worker_queue.EnqueueBlocking([] {});
EXPECT_NE(resulting_error, nullptr);
}

TEST_F(DatastoreTest, LookupDocumentsAuthFailure) {
credentials.FailGetToken();

__block NSError* resulting_error = nullptr;
datastore->LookupDocuments(
{}, ^(NSArray<FSTMaybeDocument*>* docs, NSError* _Nullable error) {
resulting_error = error;
});
worker_queue.EnqueueBlocking([] {});
EXPECT_NE(resulting_error, nullptr);
}

TEST_F(DatastoreTest, AuthAfterDatastoreHasBeenShutDown) {
credentials.DelayGetToken();

worker_queue.EnqueueBlocking([&] {
datastore->CommitMutations(@[], ^(NSError* _Nullable error) {
FAIL() << "Callback shouldn't be invoked";
});
});
Shutdown();

EXPECT_NO_THROW(credentials.InvokeGetToken());
}

// TODO(varconst): this test currently fails due to a gRPC issue, see here
// https://github.com/firebase/firebase-ios-sdk/pull/1935#discussion_r224900667
// for details. Reenable when/if possible.
TEST_F(DatastoreTest, DISABLED_AuthOutlivesDatastore) {
credentials.DelayGetToken();

worker_queue.EnqueueBlocking([&] {
datastore->CommitMutations(@[], ^(NSError* _Nullable error) {
FAIL() << "Callback shouldn't be invoked";
});
});
Shutdown();
datastore.reset();

EXPECT_NO_THROW(credentials.InvokeGetToken());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test currently fails, I'll fix before merging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this actually looks like a non-trivial issue.

The root of the problem is that there's an implicit dependency between grpc::CompletionQueue and grpc::ByteBuffer's lifetimes. The smallest repro is just:

{
  grpc::Slice slice{"foo"};
  grpc::ByteBuffer b{&slice, 1}; // Buffer must be non-empty
  grpc::CompletionQueue cq; // Assuming it's the only gRPC-related object around
} // Once the scope ends, assertion will be triggered, because cq was destroyed before b

Details:

In gRPC, C core is initialized once and shut down once. All C++ classes that need the core to be initialized inherit from GrpcLibraryCodegen. GrpcLibraryCodegen essentially makes C core reference-counted; each constructor increments, and each destructor decrements, the number of references to C core, and once the last reference is destroyed, the C core is shut down.

In this case, destroying Datastore destroys grpc::CompletionQueue, which happens to be the last reference to C core, so the line 154 (datastore.reset()) leads to global shutdown. The global shutdown, among other things, shuts down ExecCtx.

When EmptyCredentialsProvider::GetToken is called, the TokenListener (a std::function) is passed by value, so at the end of the call the destructor of TokenListener is called, which leads to the destruction of a lambda created by Datastore that contains a grpc::ByteBuffer. When a grpc::ByteBuffer is destroyed, it creates an ExecCtx, which fails because global shutdown has already been called on ExecCtx, leading to an assertion failure and a crash.

(Note that the fact that GetToken takes its argument by value isn't really an issue here; if the argument were taken by reference, the problem would surface when the credentials provider is destroyed. The root of the problem seems that the ByteBuffer-containing lambda may outlive gRPC core).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wilhuff Re. the above:

  1. As far as gRPC is concerned, do you feel it might be a bug? (and hence, worth reporting)
  2. I presume we care about this case (Auth outlives Firestore) and don't want a crash there -- let me know if I misunderstand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Submitted an issue to gRPC repo: grpc/grpc#16875

Copy link
Contributor

@wilhuff wilhuff Oct 15, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re (1): this could be a bug, but realistically I think it's one we'll have to work around. Possibly this means that we should not be using ByteBuffers for anything except directly sending into/out of gRPC calls such that the construction order you're describing never happens. However, it's also possible I'm misunderstanding, because it seems like we really shouldn't get into a state where we've destroyed the completion queue before the last byte buffer we might have submitted into it.

Re (2): I don't think the issue is that we care so much about auth outliving firestore as we want to handle races where Firestore may be asked to shutdown while an auth request is pending. We should not crash in this circumstance.

In our public API shutdown is asynchronous, so we could work around this by performing teardown in two passes: a first pass to quiesce the system, inhibiting new requests and waiting for any outstanding ones and then tearing things down.

Alternatively, for any request that might outlive the system add some way to disconnect it such that when it calls back it doesn't attempt any action on the already destroyed system.

}

} // namespace remote
} // namespace firestore
} // namespace firebase
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,41 @@ TEST_F(GrpcConnectionTest, ConnectivityChangeWithSeveralActiveCalls) {
EXPECT_EQ(changes_count, 3);
}

TEST_F(GrpcConnectionTest, ShutdownFastFinishesActiveCalls) {
class NoFinishObserver : public GrpcStreamObserver {
public:
void OnStreamStart() override {
}
void OnStreamRead(const grpc::ByteBuffer& message) override {
}
void OnStreamFinish(const util::Status& status) override {
FAIL() << "Observer shouldn't have been invoked";
}
};

NoFinishObserver observer;
std::unique_ptr<GrpcStream> foo = tester.CreateStream(&observer);
foo->Start();

std::unique_ptr<GrpcStreamingReader> bar = tester.CreateStreamingReader();
bar->Start([](const StatusOr<std::vector<grpc::ByteBuffer>>&) {
FAIL() << "Callback shouldn't have been invoked";
});

std::unique_ptr<GrpcUnaryCall> baz = tester.CreateUnaryCall();
baz->Start([](const StatusOr<grpc::ByteBuffer>&) {
FAIL() << "Callback shouldn't have been invoked";
});

tester.KeepPollingGrpcQueue();
worker_queue.EnqueueBlocking([&] { tester.grpc_connection()->Shutdown(); });

// Destroying a call will throw if it hasn't been properly shut down.
EXPECT_NO_THROW(foo.reset());
EXPECT_NO_THROW(bar.reset());
EXPECT_NO_THROW(baz.reset());
}

} // namespace remote
} // namespace firestore
} // namespace firebase
Loading