Skip to content

Commit c7eef78

Browse files
Review
1 parent 18425c1 commit c7eef78

File tree

9 files changed

+175
-161
lines changed

9 files changed

+175
-161
lines changed

packages/firestore/src/core/component_provider.ts

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import { EventManager } from './event_manager';
2828
import { AsyncQueue } from '../util/async_queue';
2929
import { DatabaseInfo } from './database_info';
3030
import { Platform } from '../platform/platform';
31+
import { Datastore } from '../remote/datastore';
3132
import { User } from '../auth/user';
3233
import { PersistenceSettings } from './firestore_client';
3334
import { debugAssert } from '../util/assert';
@@ -41,9 +42,6 @@ import {
4142
MemoryEagerDelegate,
4243
MemoryPersistence
4344
} from '../local/memory_persistence';
44-
import { Connection } from '../remote/connection';
45-
import { CredentialsProvider } from '../api/credentials';
46-
import { JsonProtoSerializer } from '../remote/serializer';
4745

4846
const MEMORY_ONLY_PERSISTENCE_ERROR_MESSAGE =
4947
'You are using the memory-only build of Firestore. Persistence support is ' +
@@ -54,9 +52,7 @@ export interface ComponentConfiguration {
5452
asyncQueue: AsyncQueue;
5553
databaseInfo: DatabaseInfo;
5654
platform: Platform;
57-
connection: Connection;
58-
credentials: CredentialsProvider;
59-
serializer: JsonProtoSerializer;
55+
datastore: Datastore;
6056
clientId: ClientId;
6157
initialUser: User;
6258
maxConcurrentLimboResolutions: number;
@@ -147,9 +143,7 @@ export class MemoryComponentProvider implements ComponentProvider {
147143
createRemoteStore(cfg: ComponentConfiguration): RemoteStore {
148144
return new RemoteStore(
149145
this.localStore,
150-
cfg.connection,
151-
cfg.credentials,
152-
cfg.serializer,
146+
cfg.datastore,
153147
cfg.asyncQueue,
154148
onlineState =>
155149
this.syncEngine.applyOnlineStateChange(

packages/firestore/src/core/firestore_client.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import { Document, MaybeDocument, NoDocument } from '../model/document';
2323
import { DocumentKey } from '../model/document_key';
2424
import { Mutation } from '../model/mutation';
2525
import { Platform } from '../platform/platform';
26+
import { Datastore } from '../remote/datastore';
2627
import { RemoteStore } from '../remote/remote_store';
2728
import { AsyncQueue } from '../util/async_queue';
2829
import { Code, FirestoreError } from '../util/error';
@@ -237,14 +238,13 @@ export class FirestoreClient {
237238
const serializer = this.platform.newSerializer(
238239
this.databaseInfo.databaseId
239240
);
241+
const datastore = new Datastore(connection, this.credentials, serializer);
240242

241243
await componentProvider.initialize({
242244
asyncQueue: this.asyncQueue,
243245
databaseInfo: this.databaseInfo,
244246
platform: this.platform,
245-
connection,
246-
credentials: this.credentials,
247-
serializer,
247+
datastore,
248248
clientId: this.clientId,
249249
initialUser: user,
250250
maxConcurrentLimboResolutions: MAX_CONCURRENT_LIMBO_RESOLUTIONS,

packages/firestore/src/core/transaction.ts

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,11 @@ import {
2626
Precondition,
2727
VerifyMutation
2828
} from '../model/mutation';
29-
import { Datastore } from '../remote/datastore';
29+
import {
30+
Datastore,
31+
invokeBatchGetDocumentsRpc,
32+
invokeCommitRpc
33+
} from '../remote/datastore';
3034
import { fail, debugAssert } from '../util/assert';
3135
import { Code, FirestoreError } from '../util/error';
3236
import { SnapshotVersion } from './snapshot_version';
@@ -66,7 +70,7 @@ export class Transaction {
6670
'Firestore transactions require all reads to be executed before all writes.'
6771
);
6872
}
69-
const docs = await this.datastore.lookup(keys);
73+
const docs = await invokeBatchGetDocumentsRpc(this.datastore, keys);
7074
docs.forEach(doc => {
7175
if (doc instanceof NoDocument || doc instanceof Document) {
7276
this.recordVersion(doc);
@@ -112,7 +116,7 @@ export class Transaction {
112116
unwritten.forEach((key, _version) => {
113117
this.mutations.push(new VerifyMutation(key, this.precondition(key)));
114118
});
115-
await this.datastore.commit(this.mutations);
119+
await invokeCommitRpc(this.datastore, this.mutations);
116120
this.committed = true;
117121
}
118122

packages/firestore/src/remote/datastore.ts

Lines changed: 88 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,14 @@ import * as api from '../protos/firestore_proto_api';
2323
import { hardAssert } from '../util/assert';
2424
import { Code, FirestoreError } from '../util/error';
2525
import { Connection } from './connection';
26-
2726
import { JsonProtoSerializer } from './serializer';
27+
import {
28+
PersistentListenStream,
29+
PersistentWriteStream,
30+
WatchStreamListener,
31+
WriteStreamListener
32+
} from './persistent_stream';
33+
import { AsyncQueue } from '../util/async_queue';
2834

2935
// The generated proto interfaces for these class are missing the database
3036
// field. So we add it here.
@@ -43,53 +49,13 @@ interface CommitRequest extends api.CommitRequest {
4349
*/
4450
export class Datastore {
4551
constructor(
46-
private connection: Connection,
47-
private credentials: CredentialsProvider,
48-
private serializer: JsonProtoSerializer
52+
public readonly connection: Connection,
53+
public readonly credentials: CredentialsProvider,
54+
public readonly serializer: JsonProtoSerializer
4955
) {}
5056

51-
commit(mutations: Mutation[]): Promise<MutationResult[]> {
52-
const params: CommitRequest = {
53-
database: this.serializer.encodedDatabaseId,
54-
writes: mutations.map(m => this.serializer.toMutation(m))
55-
};
56-
return this.invokeRPC<CommitRequest, api.CommitResponse>(
57-
'Commit',
58-
params
59-
).then(response => {
60-
return this.serializer.fromWriteResults(
61-
response.writeResults,
62-
response.commitTime
63-
);
64-
});
65-
}
66-
67-
lookup(keys: DocumentKey[]): Promise<MaybeDocument[]> {
68-
const params: BatchGetDocumentsRequest = {
69-
database: this.serializer.encodedDatabaseId,
70-
documents: keys.map(k => this.serializer.toName(k))
71-
};
72-
return this.invokeStreamingRPC<
73-
BatchGetDocumentsRequest,
74-
api.BatchGetDocumentsResponse
75-
>('BatchGetDocuments', params).then(response => {
76-
const docs = new Map<string, MaybeDocument>();
77-
response.forEach(proto => {
78-
const doc = this.serializer.fromMaybeDocument(proto);
79-
docs.set(doc.key.toString(), doc);
80-
});
81-
const result: MaybeDocument[] = [];
82-
keys.forEach(key => {
83-
const doc = docs.get(key.toString());
84-
hardAssert(!!doc, 'Missing entity in write response for ' + key);
85-
result.push(doc);
86-
});
87-
return result;
88-
});
89-
}
90-
9157
/** Gets an auth token and invokes the provided RPC. */
92-
private invokeRPC<Req, Resp>(rpcName: string, request: Req): Promise<Resp> {
58+
invokeRPC<Req, Resp>(rpcName: string, request: Req): Promise<Resp> {
9359
return this.credentials
9460
.getToken()
9561
.then(token => {
@@ -104,7 +70,7 @@ export class Datastore {
10470
}
10571

10672
/** Gets an auth token and invokes the provided RPC with streamed results. */
107-
private invokeStreamingRPC<Req, Resp>(
73+
invokeStreamingRPC<Req, Resp>(
10874
rpcName: string,
10975
request: Req
11076
): Promise<Resp[]> {
@@ -125,3 +91,79 @@ export class Datastore {
12591
});
12692
}
12793
}
94+
95+
export function invokeCommitRpc(
96+
datastore: Datastore,
97+
mutations: Mutation[]
98+
): Promise<MutationResult[]> {
99+
const params: CommitRequest = {
100+
database: datastore.serializer.encodedDatabaseId,
101+
writes: mutations.map(m => datastore.serializer.toMutation(m))
102+
};
103+
return datastore
104+
.invokeRPC<CommitRequest, api.CommitResponse>('Commit', params)
105+
.then(response => {
106+
return datastore.serializer.fromWriteResults(
107+
response.writeResults,
108+
response.commitTime
109+
);
110+
});
111+
}
112+
113+
export function invokeBatchGetDocumentsRpc(
114+
datastore: Datastore,
115+
keys: DocumentKey[]
116+
): Promise<MaybeDocument[]> {
117+
const params: BatchGetDocumentsRequest = {
118+
database: datastore.serializer.encodedDatabaseId,
119+
documents: keys.map(k => datastore.serializer.toName(k))
120+
};
121+
122+
return datastore
123+
.invokeStreamingRPC<
124+
BatchGetDocumentsRequest,
125+
api.BatchGetDocumentsResponse
126+
>('BatchGetDocuments', params)
127+
.then(response => {
128+
const docs = new Map<string, MaybeDocument>();
129+
response.forEach(proto => {
130+
const doc = datastore.serializer.fromMaybeDocument(proto);
131+
docs.set(doc.key.toString(), doc);
132+
});
133+
const result: MaybeDocument[] = [];
134+
keys.forEach(key => {
135+
const doc = docs.get(key.toString());
136+
hardAssert(!!doc, 'Missing entity in write response for ' + key);
137+
result.push(doc);
138+
});
139+
return result;
140+
});
141+
}
142+
143+
export function newPersistentWriteStream(
144+
datastore: Datastore,
145+
queue: AsyncQueue,
146+
listener: WriteStreamListener
147+
): PersistentWriteStream {
148+
return new PersistentWriteStream(
149+
queue,
150+
datastore.connection,
151+
datastore.credentials,
152+
datastore.serializer,
153+
listener
154+
);
155+
}
156+
157+
export function newPersistentWatchStream(
158+
datastore: Datastore,
159+
queue: AsyncQueue,
160+
listener: WatchStreamListener
161+
): PersistentListenStream {
162+
return new PersistentListenStream(
163+
queue,
164+
datastore.connection,
165+
datastore.credentials,
166+
datastore.serializer,
167+
listener
168+
);
169+
}

packages/firestore/src/remote/remote_store.ts

Lines changed: 21 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ import { logDebug } from '../util/log';
3232
import { DocumentKeySet } from '../model/collections';
3333
import { AsyncQueue } from '../util/async_queue';
3434
import { ConnectivityMonitor, NetworkStatus } from './connectivity_monitor';
35-
import { Datastore } from './datastore';
35+
import {
36+
Datastore,
37+
newPersistentWatchStream,
38+
newPersistentWriteStream
39+
} from './datastore';
3640
import { OnlineStateTracker } from './online_state_tracker';
3741
import {
3842
PersistentListenStream,
@@ -50,9 +54,6 @@ import {
5054
WatchTargetChangeState
5155
} from './watch_change';
5256
import { ByteString } from '../util/byte_string';
53-
import { Connection } from './connection';
54-
import { CredentialsProvider } from '../api/credentials';
55-
import { JsonProtoSerializer } from './serializer';
5657

5758
const LOG_TAG = 'RemoteStore';
5859

@@ -109,7 +110,6 @@ export class RemoteStore implements TargetMetadataProvider {
109110
*/
110111
private listenTargets = new Map<TargetId, TargetData>();
111112

112-
private datastore: Datastore;
113113
private connectivityMonitor: ConnectivityMonitor;
114114
private watchStream: PersistentListenStream;
115115
private writeStream: PersistentWriteStream;
@@ -126,10 +126,12 @@ export class RemoteStore implements TargetMetadataProvider {
126126
private onlineStateTracker: OnlineStateTracker;
127127

128128
constructor(
129+
/**
130+
* The local store, used to fill the write pipeline with outbound mutations.
131+
*/
129132
private localStore: LocalStore,
130-
connection: Connection,
131-
credentials: CredentialsProvider,
132-
serializer: JsonProtoSerializer,
133+
/** The client-side proxy for interacting with the backend. */
134+
private datastore: Datastore,
133135
asyncQueue: AsyncQueue,
134136
onlineStateHandler: (onlineState: OnlineState) => void,
135137
connectivityMonitor: ConnectivityMonitor
@@ -152,33 +154,19 @@ export class RemoteStore implements TargetMetadataProvider {
152154
onlineStateHandler
153155
);
154156

155-
this.datastore = new Datastore(connection, credentials, serializer);
156-
157157
// Create streams (but note they're not started yet).
158-
this.watchStream = new PersistentListenStream(
159-
asyncQueue,
160-
connection,
161-
credentials,
162-
serializer,
163-
{
164-
onOpen: this.onWatchStreamOpen.bind(this),
165-
onClose: this.onWatchStreamClose.bind(this),
166-
onWatchChange: this.onWatchStreamChange.bind(this)
167-
}
168-
);
158+
this.watchStream = newPersistentWatchStream(this.datastore, asyncQueue, {
159+
onOpen: this.onWatchStreamOpen.bind(this),
160+
onClose: this.onWatchStreamClose.bind(this),
161+
onWatchChange: this.onWatchStreamChange.bind(this)
162+
});
169163

170-
this.writeStream = new PersistentWriteStream(
171-
asyncQueue,
172-
connection,
173-
credentials,
174-
serializer,
175-
{
176-
onOpen: this.onWriteStreamOpen.bind(this),
177-
onClose: this.onWriteStreamClose.bind(this),
178-
onHandshakeComplete: this.onWriteHandshakeComplete.bind(this),
179-
onMutationResult: this.onMutationResult.bind(this)
180-
}
181-
);
164+
this.writeStream = newPersistentWriteStream(this.datastore, asyncQueue, {
165+
onOpen: this.onWriteStreamOpen.bind(this),
166+
onClose: this.onWriteStreamClose.bind(this),
167+
onHandshakeComplete: this.onWriteHandshakeComplete.bind(this),
168+
onMutationResult: this.onMutationResult.bind(this)
169+
});
182170
}
183171

184172
/**

0 commit comments

Comments
 (0)