Skip to content

Commit f098fb2

Browse files
authored
C++ migration: implement Datastore::LookupDocuments (#1820)
This includes: * adding a new gRPC wrapper, `GrpcStreamingReader`. It is a wrapper over `GrpcStream` that does one write upon the start and then accumulates reads. * refactoring `GrpcStreamTester` to be a little more similar to `GrpcConnection`.
1 parent f982397 commit f098fb2

22 files changed

+757
-144
lines changed

Firestore/Example/Firestore.xcodeproj/project.pbxproj

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@
205205
B686F2B22025000D0028D6BE /* resource_path_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B686F2B02024FFD70028D6BE /* resource_path_test.cc */; };
206206
B6BBE43121262CF400C6A53E /* grpc_stream_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B6BBE42F21262CF400C6A53E /* grpc_stream_test.cc */; };
207207
B6D1B68520E2AB1B00B35856 /* exponential_backoff_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B6D1B68420E2AB1A00B35856 /* exponential_backoff_test.cc */; };
208+
B6D964932154AB8F00EB9CFB /* grpc_streaming_reader_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B6D964922154AB8F00EB9CFB /* grpc_streaming_reader_test.cc */; };
208209
B6FB467D208E9D3C00554BA2 /* async_queue_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B6FB467B208E9A8200554BA2 /* async_queue_test.cc */; };
209210
B6FB4684208EA0EC00554BA2 /* async_queue_libdispatch_test.mm in Sources */ = {isa = PBXBuildFile; fileRef = B6FB4680208EA0BE00554BA2 /* async_queue_libdispatch_test.mm */; };
210211
B6FB4685208EA0F000554BA2 /* async_queue_std_test.cc in Sources */ = {isa = PBXBuildFile; fileRef = B6FB4681208EA0BE00554BA2 /* async_queue_std_test.cc */; };
@@ -301,7 +302,6 @@
301302
2A0CF41BA5AED6049B0BEB2C /* type_traits_apple_test.mm */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.cpp.objcpp; path = type_traits_apple_test.mm; sourceTree = "<group>"; };
302303
2B50B3A0DF77100EEE887891 /* Pods_Firestore_Tests_iOS.framework */ = {isa = PBXFileReference; explicitFileType = wrapper.framework; includeInIndex = 0; path = Pods_Firestore_Tests_iOS.framework; sourceTree = BUILT_PRODUCTS_DIR; };
303304
332485C4DCC6BA0DBB5E31B7 /* leveldb_util_test.cc */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.cpp.cpp; path = leveldb_util_test.cc; sourceTree = "<group>"; };
304-
3564F6872205C4F7001D5436 /* grpc_stream_tester.cc */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.cpp.cpp; path = grpc_stream_tester.cc; sourceTree = "<group>"; };
305305
358C3B5FE573B1D60A4F7592 /* strerror_test.cc */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.cpp.cpp; path = strerror_test.cc; sourceTree = "<group>"; };
306306
3B843E4A1F3930A400548890 /* remote_store_spec_test.json */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = text.json; path = remote_store_spec_test.json; sourceTree = "<group>"; };
307307
3C81DE3772628FE297055662 /* Pods-Firestore_Example_iOS.debug.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Firestore_Example_iOS.debug.xcconfig"; path = "Pods/Target Support Files/Pods-Firestore_Example_iOS/Pods-Firestore_Example_iOS.debug.xcconfig"; sourceTree = "<group>"; };
@@ -509,7 +509,7 @@
509509
ABC1D7DF2023A3EF00BA84F0 /* token_test.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = token_test.cc; sourceTree = "<group>"; };
510510
ABC1D7E22023CDC500BA84F0 /* firebase_credentials_provider_test.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = firebase_credentials_provider_test.mm; sourceTree = "<group>"; };
511511
ABF6506B201131F8005F2C74 /* timestamp_test.cc */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.cpp.cpp; path = timestamp_test.cc; sourceTree = "<group>"; };
512-
B1A7E1959AF8141FA7E6B888 /* grpc_stream_tester.cc */ = {isa = PBXFileReference; includeInIndex = 1; path = grpc_stream_tester.cc; sourceTree = "<group>"; };
512+
B1A7E1959AF8141FA7E6B888 /* grpc_stream_tester.cc */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = sourcecode.cpp.cpp; path = grpc_stream_tester.cc; sourceTree = "<group>"; };
513513
B3F5B3AAE791A5911B9EAA82 /* Pods-Firestore_Tests_iOS.release.xcconfig */ = {isa = PBXFileReference; includeInIndex = 1; lastKnownFileType = text.xcconfig; name = "Pods-Firestore_Tests_iOS.release.xcconfig"; path = "Pods/Target Support Files/Pods-Firestore_Tests_iOS/Pods-Firestore_Tests_iOS.release.xcconfig"; sourceTree = "<group>"; };
514514
B6152AD5202A5385000E5744 /* document_key_test.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = document_key_test.cc; sourceTree = "<group>"; };
515515
B65D34A7203C99090076A5E1 /* FIRTimestampTest.m */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.objc; path = FIRTimestampTest.m; sourceTree = "<group>"; };
@@ -518,6 +518,7 @@
518518
B686F2B02024FFD70028D6BE /* resource_path_test.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = resource_path_test.cc; sourceTree = "<group>"; };
519519
B6BBE42F21262CF400C6A53E /* grpc_stream_test.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = grpc_stream_test.cc; sourceTree = "<group>"; };
520520
B6D1B68420E2AB1A00B35856 /* exponential_backoff_test.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = exponential_backoff_test.cc; sourceTree = "<group>"; };
521+
B6D964922154AB8F00EB9CFB /* grpc_streaming_reader_test.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = grpc_streaming_reader_test.cc; sourceTree = "<group>"; };
521522
B6FB467A208E9A8200554BA2 /* async_queue_test.h */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.c.h; path = async_queue_test.h; sourceTree = "<group>"; };
522523
B6FB467B208E9A8200554BA2 /* async_queue_test.cc */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; path = async_queue_test.cc; sourceTree = "<group>"; };
523524
B6FB4680208EA0BE00554BA2 /* async_queue_libdispatch_test.mm */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.objcpp; path = async_queue_libdispatch_test.mm; sourceTree = "<group>"; };
@@ -652,6 +653,7 @@
652653
546854A720A3681B004BDBD5 /* remote */ = {
653654
isa = PBXGroup;
654655
children = (
656+
B6D964922154AB8F00EB9CFB /* grpc_streaming_reader_test.cc */,
655657
546854A820A36867004BDBD5 /* datastore_test.mm */,
656658
B6D1B68420E2AB1A00B35856 /* exponential_backoff_test.cc */,
657659
B6BBE42F21262CF400C6A53E /* grpc_stream_test.cc */,
@@ -1875,6 +1877,7 @@
18751877
B6FB4684208EA0EC00554BA2 /* async_queue_libdispatch_test.mm in Sources */,
18761878
B6FB4685208EA0F000554BA2 /* async_queue_std_test.cc in Sources */,
18771879
B6FB467D208E9D3C00554BA2 /* async_queue_test.cc in Sources */,
1880+
B6D964932154AB8F00EB9CFB /* grpc_streaming_reader_test.cc in Sources */,
18781881
54740A581FC914F000713A1A /* autoid_test.cc in Sources */,
18791882
AB380D02201BC69F00D97691 /* bits_test.cc in Sources */,
18801883
618BBEA920B89AAC00B5BCE7 /* common.pb.cc in Sources */,

Firestore/core/src/firebase/firestore/remote/datastore.h

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,63 @@
1717
#ifndef FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_REMOTE_DATASTORE_H_
1818
#define FIRESTORE_CORE_SRC_FIREBASE_FIRESTORE_REMOTE_DATASTORE_H_
1919

20+
#if !defined(__OBJC__)
21+
#error "This header only supports Objective-C++"
22+
#endif // !defined(__OBJC__)
23+
24+
#import <Foundation/Foundation.h>
25+
#include <functional>
2026
#include <memory>
2127
#include <string>
28+
#include <vector>
2229

30+
#include "Firestore/core/src/firebase/firestore/auth/credentials_provider.h"
31+
#include "Firestore/core/src/firebase/firestore/auth/token.h"
2332
#include "Firestore/core/src/firebase/firestore/core/database_info.h"
33+
#include "Firestore/core/src/firebase/firestore/model/document_key.h"
2434
#include "Firestore/core/src/firebase/firestore/remote/grpc_connection.h"
2535
#include "Firestore/core/src/firebase/firestore/remote/grpc_stream.h"
26-
#include "Firestore/core/src/firebase/firestore/remote/grpc_stream_observer.h"
36+
#include "Firestore/core/src/firebase/firestore/remote/grpc_streaming_reader.h"
37+
#include "Firestore/core/src/firebase/firestore/remote/remote_objc_bridge.h"
2738
#include "Firestore/core/src/firebase/firestore/remote/watch_stream.h"
2839
#include "Firestore/core/src/firebase/firestore/remote/write_stream.h"
2940
#include "Firestore/core/src/firebase/firestore/util/async_queue.h"
3041
#include "Firestore/core/src/firebase/firestore/util/executor.h"
3142
#include "Firestore/core/src/firebase/firestore/util/status.h"
43+
#include "Firestore/core/src/firebase/firestore/util/statusor.h"
3244
#include "absl/strings/string_view.h"
3345
#include "grpcpp/completion_queue.h"
3446
#include "grpcpp/support/status.h"
3547

48+
#import "Firestore/Source/Core/FSTTypes.h"
49+
#import "Firestore/Source/Remote/FSTSerializerBeta.h"
50+
#import "Firestore/Source/Remote/FSTStream.h"
51+
3652
namespace firebase {
3753
namespace firestore {
3854
namespace remote {
3955

40-
class Datastore {
56+
/**
57+
* `Datastore` represents a proxy for the remote server, hiding details of the
58+
* RPC layer. It:
59+
*
60+
* - Manages connections to the server
61+
* - Authenticates to the server
62+
* - Manages threading and keeps higher-level code running on the worker queue
63+
* - Serializes internal model objects to and from protocol buffers
64+
*
65+
* `Datastore` is generally not responsible for understanding the higher-level
66+
* protocol involved in actually making changes or reading data, and aside from
67+
* the connections it manages is otherwise stateless.
68+
*/
69+
class Datastore : public std::enable_shared_from_this<Datastore> {
4170
public:
4271
Datastore(const core::DatabaseInfo& database_info,
4372
util::AsyncQueue* worker_queue,
4473
auth::CredentialsProvider* credentials,
4574
FSTSerializerBeta* serializer);
4675

76+
/** Cancels any pending gRPC calls and drains the gRPC completion queue. */
4777
void Shutdown();
4878

4979
/**
@@ -59,6 +89,9 @@ class Datastore {
5989
std::shared_ptr<WriteStream> CreateWriteStream(
6090
id<FSTWriteStreamDelegate> delegate);
6191

92+
void LookupDocuments(const std::vector<model::DocumentKey>& keys,
93+
FSTVoidMaybeDocumentArrayErrorBlock completion);
94+
6295
static std::string GetWhitelistedHeadersAsString(
6396
const GrpcStream::MetadataT& headers);
6497

@@ -70,18 +103,36 @@ class Datastore {
70103
private:
71104
void PollGrpcQueue();
72105

106+
void LookupDocumentsWithCredentials(
107+
const auth::Token& token,
108+
const grpc::ByteBuffer& message,
109+
FSTVoidMaybeDocumentArrayErrorBlock completion);
110+
void OnLookupDocumentsResponse(
111+
GrpcStreamingReader* call,
112+
const util::StatusOr<std::vector<grpc::ByteBuffer>>& result,
113+
FSTVoidMaybeDocumentArrayErrorBlock completion);
114+
115+
using OnCredentials = std::function<void(const util::StatusOr<auth::Token>&)>;
116+
void ResumeRpcWithCredentials(const OnCredentials& on_token);
117+
118+
void HandleCallStatus(const util::Status& status);
119+
120+
void RemoveGrpcCall(GrpcStreamingReader* to_remove);
121+
73122
static GrpcStream::MetadataT ExtractWhitelistedHeaders(
74123
const GrpcStream::MetadataT& headers);
75124

76125
util::AsyncQueue* worker_queue_ = nullptr;
77126
auth::CredentialsProvider* credentials_ = nullptr;
78-
FSTSerializerBeta* serializer_ = nullptr;
79127

80128
// A separate executor dedicated to polling gRPC completion queue (which is
81-
// shared for all spawned `GrpcStream`s).
129+
// shared for all spawned gRPC streams and calls).
82130
std::unique_ptr<util::internal::Executor> rpc_executor_;
83131
grpc::CompletionQueue grpc_queue_;
84132
GrpcConnection grpc_connection_;
133+
134+
std::vector<std::unique_ptr<GrpcStreamingReader>> lookup_calls_;
135+
bridge::DatastoreSerializer serializer_bridge_;
85136
};
86137

87138
} // namespace remote

Firestore/core/src/firebase/firestore/remote/datastore.mm

Lines changed: 125 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,18 @@
1717
#include "Firestore/core/src/firebase/firestore/remote/datastore.h"
1818

1919
#include <unordered_set>
20+
#include <utility>
2021

2122
#include "Firestore/core/include/firebase/firestore/firestore_errors.h"
2223
#include "Firestore/core/src/firebase/firestore/auth/credentials_provider.h"
2324
#include "Firestore/core/src/firebase/firestore/auth/token.h"
2425
#include "Firestore/core/src/firebase/firestore/core/database_info.h"
2526
#include "Firestore/core/src/firebase/firestore/remote/grpc_completion.h"
27+
#include "Firestore/core/src/firebase/firestore/util/error_apple.h"
2628
#include "Firestore/core/src/firebase/firestore/util/executor_libdispatch.h"
2729
#include "Firestore/core/src/firebase/firestore/util/hard_assert.h"
30+
#include "Firestore/core/src/firebase/firestore/util/log.h"
31+
#include "Firestore/core/src/firebase/firestore/util/statusor.h"
2832
#include "absl/memory/memory.h"
2933
#include "absl/strings/str_cat.h"
3034

@@ -35,13 +39,18 @@
3539
using auth::CredentialsProvider;
3640
using auth::Token;
3741
using core::DatabaseInfo;
42+
using model::DocumentKey;
3843
using util::AsyncQueue;
3944
using util::Status;
45+
using util::StatusOr;
4046
using util::internal::Executor;
4147
using util::internal::ExecutorLibdispatch;
4248

4349
namespace {
4450

51+
const auto kRpcNameLookup =
52+
"/google.firestore.v1beta1.Firestore/BatchGetDocuments";
53+
4554
std::unique_ptr<Executor> CreateExecutor() {
4655
auto queue = dispatch_queue_create("com.google.firebase.firestore.rpc",
4756
DISPATCH_QUEUE_SERIAL);
@@ -56,6 +65,18 @@
5665
return {grpc_str.begin(), grpc_str.size()};
5766
}
5867

68+
void LogGrpcCallFinished(absl::string_view rpc_name,
69+
GrpcStreamingReader *call,
70+
const Status &status) {
71+
LOG_DEBUG("RPC %s completed. Error: %s: %s", rpc_name, status.code(),
72+
status.error_message());
73+
if (bridge::IsLoggingEnabled()) {
74+
auto headers =
75+
Datastore::GetWhitelistedHeadersAsString(call->GetResponseHeaders());
76+
LOG_DEBUG("RPC %s returned headers (whitelisted): %s", rpc_name, headers);
77+
}
78+
}
79+
5980
} // namespace
6081

6182
Datastore::Datastore(const DatabaseInfo &database_info,
@@ -66,11 +87,16 @@
6687
worker_queue_{worker_queue},
6788
credentials_{credentials},
6889
rpc_executor_{CreateExecutor()},
69-
serializer_{serializer} {
90+
serializer_bridge_{serializer} {
7091
rpc_executor_->Execute([this] { PollGrpcQueue(); });
7192
}
7293

7394
void Datastore::Shutdown() {
95+
for (auto &call : lookup_calls_) {
96+
call->Cancel();
97+
}
98+
lookup_calls_.clear();
99+
74100
// `grpc::CompletionQueue::Next` will only return `false` once `Shutdown` has
75101
// been called and all submitted tags have been extracted. Without this call,
76102
// `rpc_executor_` will never finish.
@@ -99,16 +125,112 @@
99125

100126
std::shared_ptr<WatchStream> Datastore::CreateWatchStream(
101127
id<FSTWatchStreamDelegate> delegate) {
102-
return std::make_shared<WatchStream>(worker_queue_, credentials_, serializer_,
128+
return std::make_shared<WatchStream>(worker_queue_, credentials_,
129+
serializer_bridge_.GetSerializer(),
103130
&grpc_connection_, delegate);
104131
}
105132

106133
std::shared_ptr<WriteStream> Datastore::CreateWriteStream(
107134
id<FSTWriteStreamDelegate> delegate) {
108-
return std::make_shared<WriteStream>(worker_queue_, credentials_, serializer_,
135+
return std::make_shared<WriteStream>(worker_queue_, credentials_,
136+
serializer_bridge_.GetSerializer(),
109137
&grpc_connection_, delegate);
110138
}
111139

140+
void Datastore::LookupDocuments(
141+
const std::vector<DocumentKey> &keys,
142+
FSTVoidMaybeDocumentArrayErrorBlock completion) {
143+
grpc::ByteBuffer message = serializer_bridge_.ToByteBuffer(
144+
serializer_bridge_.CreateLookupRequest(keys));
145+
146+
ResumeRpcWithCredentials(
147+
[this, message, completion](const StatusOr<Token> &maybe_credentials) {
148+
if (!maybe_credentials.ok()) {
149+
completion(nil, util::MakeNSError(maybe_credentials.status()));
150+
return;
151+
}
152+
LookupDocumentsWithCredentials(maybe_credentials.ValueOrDie(), message,
153+
completion);
154+
});
155+
}
156+
157+
void Datastore::LookupDocumentsWithCredentials(
158+
const Token &token,
159+
const grpc::ByteBuffer &message,
160+
FSTVoidMaybeDocumentArrayErrorBlock completion) {
161+
lookup_calls_.push_back(grpc_connection_.CreateStreamingReader(
162+
kRpcNameLookup, token, std::move(message)));
163+
GrpcStreamingReader *call = lookup_calls_.back().get();
164+
165+
call->Start([this, call, completion](
166+
const StatusOr<std::vector<grpc::ByteBuffer>> &result) {
167+
OnLookupDocumentsResponse(call, result, completion);
168+
RemoveGrpcCall(call);
169+
});
170+
}
171+
172+
void Datastore::OnLookupDocumentsResponse(
173+
GrpcStreamingReader *call,
174+
const StatusOr<std::vector<grpc::ByteBuffer>> &result,
175+
FSTVoidMaybeDocumentArrayErrorBlock completion) {
176+
LogGrpcCallFinished("BatchGetDocuments", call, result.status());
177+
HandleCallStatus(result.status());
178+
179+
if (!result.ok()) {
180+
completion(nil, util::MakeNSError(result.status()));
181+
return;
182+
}
183+
184+
Status parse_status;
185+
std::vector<grpc::ByteBuffer> responses = std::move(result).ValueOrDie();
186+
NSArray<FSTMaybeDocument *> *docs =
187+
serializer_bridge_.MergeLookupResponses(responses, &parse_status);
188+
if (parse_status.ok()) {
189+
completion(docs, nil);
190+
} else {
191+
completion(nil, util::MakeNSError(parse_status));
192+
}
193+
}
194+
195+
void Datastore::ResumeRpcWithCredentials(const OnCredentials &on_credentials) {
196+
// Auth may outlive Firestore
197+
std::weak_ptr<Datastore> weak_this{shared_from_this()};
198+
199+
credentials_->GetToken(
200+
[weak_this, on_credentials](const StatusOr<Token> &result) {
201+
auto strong_this = weak_this.lock();
202+
if (!strong_this) {
203+
return;
204+
}
205+
206+
strong_this->worker_queue_->EnqueueRelaxed(
207+
[weak_this, result, on_credentials] {
208+
auto strong_this = weak_this.lock();
209+
if (!strong_this) {
210+
return;
211+
}
212+
213+
on_credentials(result);
214+
});
215+
});
216+
}
217+
218+
void Datastore::HandleCallStatus(const Status &status) {
219+
if (status.code() == FirestoreErrorCode::Unauthenticated) {
220+
credentials_->InvalidateToken();
221+
}
222+
}
223+
224+
void Datastore::RemoveGrpcCall(GrpcStreamingReader *to_remove) {
225+
auto found = std::find_if(
226+
lookup_calls_.begin(), lookup_calls_.end(),
227+
[to_remove](const std::unique_ptr<GrpcStreamingReader> &call) {
228+
return call.get() == to_remove;
229+
});
230+
HARD_ASSERT(found != lookup_calls_.end(), "Missing gRPC call");
231+
lookup_calls_.erase(found);
232+
}
233+
112234
std::string Datastore::GetWhitelistedHeadersAsString(
113235
const GrpcStream::MetadataT &headers) {
114236
static std::unordered_set<std::string> whitelist = {

Firestore/core/src/firebase/firestore/remote/grpc_connection.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "Firestore/core/src/firebase/firestore/core/database_info.h"
2424
#include "Firestore/core/src/firebase/firestore/remote/grpc_stream.h"
2525
#include "Firestore/core/src/firebase/firestore/remote/grpc_stream_observer.h"
26+
#include "Firestore/core/src/firebase/firestore/remote/grpc_streaming_reader.h"
2627
#include "absl/strings/string_view.h"
2728
#include "grpcpp/channel.h"
2829
#include "grpcpp/client_context.h"
@@ -57,6 +58,11 @@ class GrpcConnection {
5758
const auth::Token& token,
5859
GrpcStreamObserver* observer);
5960

61+
std::unique_ptr<GrpcStreamingReader> CreateStreamingReader(
62+
absl::string_view rpc_name,
63+
const auth::Token& token,
64+
const grpc::ByteBuffer& message);
65+
6066
private:
6167
std::unique_ptr<grpc::ClientContext> CreateContext(
6268
const auth::Token& credential) const;

Firestore/core/src/firebase/firestore/remote/grpc_connection.mm

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,21 @@
118118
observer, worker_queue_);
119119
}
120120

121+
std::unique_ptr<GrpcStreamingReader> GrpcConnection::CreateStreamingReader(
122+
absl::string_view rpc_name,
123+
const Token &token,
124+
const grpc::ByteBuffer &message) {
125+
LOG_DEBUG("Creating gRPC streaming reader");
126+
127+
EnsureActiveStub();
128+
129+
auto context = CreateContext(token);
130+
auto call =
131+
grpc_stub_->PrepareCall(context.get(), MakeString(rpc_name), grpc_queue_);
132+
return absl::make_unique<GrpcStreamingReader>(
133+
std::move(context), std::move(call), worker_queue_, message);
134+
}
135+
121136
} // namespace remote
122137
} // namespace firestore
123138
} // namespace firebase

0 commit comments

Comments
 (0)