Skip to content

Commit 0bf9c73

Browse files
authored
Implement bundle loading. (#3201)
1 parent 3a78d7c commit 0bf9c73

File tree

16 files changed

+1291
-115
lines changed

16 files changed

+1291
-115
lines changed

packages/firestore-types/index.d.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,40 @@ export class FirebaseFirestore {
9393

9494
terminate(): Promise<void>;
9595

96+
loadBundle(
97+
bundleData: ArrayBuffer | ReadableStream<ArrayBuffer> | string
98+
): LoadBundleTask;
99+
96100
INTERNAL: { delete: () => Promise<void> };
97101
}
98102

103+
export interface LoadBundleTask {
104+
onProgress(
105+
next?: (progress: LoadBundleTaskProgress) => any,
106+
error?: (error: Error) => any,
107+
complete?: () => void
108+
): void;
109+
110+
then<T, R>(
111+
onFulfilled?: (a: LoadBundleTaskProgress) => T | PromiseLike<T>,
112+
onRejected?: (a: Error) => R | PromiseLike<R>
113+
): Promise<T | R>;
114+
115+
catch<R>(
116+
onRejected: (a: Error) => R | PromiseLike<R>
117+
): Promise<R | LoadBundleTaskProgress>;
118+
}
119+
120+
export interface LoadBundleTaskProgress {
121+
documentsLoaded: number;
122+
totalDocuments: number;
123+
bytesLoaded: number;
124+
totalBytes: number;
125+
taskState: TaskState;
126+
}
127+
128+
export type TaskState = 'Error' | 'Running' | 'Success';
129+
99130
export class GeoPoint {
100131
constructor(latitude: number, longitude: number);
101132

packages/firestore/src/api/bundle.ts

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* @license
3+
* Copyright 2020 Google LLC
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
import * as firestore from '@firebase/firestore-types';
19+
import { Deferred } from '../util/promise';
20+
import { PartialObserver } from './observer';
21+
import { debugAssert } from '../util/assert';
22+
23+
export class LoadBundleTask
24+
implements
25+
firestore.LoadBundleTask,
26+
PromiseLike<firestore.LoadBundleTaskProgress> {
27+
private _progressObserver: PartialObserver<
28+
firestore.LoadBundleTaskProgress
29+
> = {};
30+
private _taskCompletionResolver = new Deferred<
31+
firestore.LoadBundleTaskProgress
32+
>();
33+
34+
private _lastProgress: firestore.LoadBundleTaskProgress = {
35+
taskState: 'Running',
36+
totalBytes: 0,
37+
totalDocuments: 0,
38+
bytesLoaded: 0,
39+
documentsLoaded: 0
40+
};
41+
42+
onProgress(
43+
next?: (progress: firestore.LoadBundleTaskProgress) => unknown,
44+
error?: (err: Error) => unknown,
45+
complete?: () => void
46+
): void {
47+
this._progressObserver = {
48+
next,
49+
error,
50+
complete
51+
};
52+
}
53+
54+
catch<R>(
55+
onRejected: (a: Error) => R | PromiseLike<R>
56+
): Promise<R | firestore.LoadBundleTaskProgress> {
57+
return this._taskCompletionResolver.promise.catch(onRejected);
58+
}
59+
60+
then<T, R>(
61+
onFulfilled?: (a: firestore.LoadBundleTaskProgress) => T | PromiseLike<T>,
62+
onRejected?: (a: Error) => R | PromiseLike<R>
63+
): Promise<T | R> {
64+
return this._taskCompletionResolver.promise.then(onFulfilled, onRejected);
65+
}
66+
67+
/**
68+
* Notifies all observers that bundle loading has completed, with a provided
69+
* `LoadBundleTaskProgress` object.
70+
*/
71+
_completeWith(progress: firestore.LoadBundleTaskProgress): void {
72+
debugAssert(
73+
progress.taskState === 'Success',
74+
'Task is not completed with Success.'
75+
);
76+
this._updateProgress(progress);
77+
if (this._progressObserver.complete) {
78+
this._progressObserver.complete();
79+
}
80+
81+
this._taskCompletionResolver.resolve(progress);
82+
}
83+
84+
/**
85+
* Notifies all observers that bundle loading has failed, with a provided
86+
* `Error` as the reason.
87+
*/
88+
_failWith(error: Error): void {
89+
this._lastProgress.taskState = 'Error';
90+
91+
if (this._progressObserver.next) {
92+
this._progressObserver.next(this._lastProgress);
93+
}
94+
95+
if (this._progressObserver.error) {
96+
this._progressObserver.error(error);
97+
}
98+
99+
this._taskCompletionResolver.reject(error);
100+
}
101+
102+
/**
103+
* Notifies a progress update of loading a bundle.
104+
* @param progress The new progress.
105+
*/
106+
_updateProgress(progress: firestore.LoadBundleTaskProgress): void {
107+
debugAssert(
108+
this._lastProgress.taskState === 'Running',
109+
'Cannot update progress on a completed or failed task'
110+
);
111+
112+
this._lastProgress = progress;
113+
if (this._progressObserver.next) {
114+
this._progressObserver.next(progress);
115+
}
116+
}
117+
}

packages/firestore/src/api/database.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,13 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService {
493493
};
494494
}
495495

496+
loadBundle(
497+
bundleData: ArrayBuffer | ReadableStream<Uint8Array> | string
498+
): firestore.LoadBundleTask {
499+
this.ensureClientConfigured();
500+
return this._firestoreClient!.loadBundle(bundleData);
501+
}
502+
496503
ensureClientConfigured(): FirestoreClient {
497504
if (!this._firestoreClient) {
498505
// Kick off starting the client but don't actually wait for it.

packages/firestore/src/core/bundle.ts

Lines changed: 134 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18+
import * as firestore from '@firebase/firestore-types';
1819
import { Query } from './query';
1920
import { SnapshotVersion } from './snapshot_version';
2021
import {
@@ -28,6 +29,14 @@ import * as api from '../protos/firestore_proto_api';
2829
import { DocumentKey } from '../model/document_key';
2930
import { MaybeDocument, NoDocument } from '../model/document';
3031
import { debugAssert } from '../util/assert';
32+
import {
33+
applyBundleDocuments,
34+
LocalStore,
35+
saveNamedQuery
36+
} from '../local/local_store';
37+
import { SizedBundleElement } from '../util/bundle_reader';
38+
import { MaybeDocumentMap } from '../model/collections';
39+
import { BundleMetadata } from '../protos/firestore_bundle_proto';
3140

3241
/**
3342
* Represents a Firestore bundle saved by the SDK in its local storage.
@@ -58,7 +67,7 @@ export interface NamedQuery {
5867
*/
5968
interface BundledDocument {
6069
metadata: bundleProto.BundledDocumentMetadata;
61-
document: api.Document | undefined;
70+
document?: api.Document;
6271
}
6372

6473
/**
@@ -98,3 +107,127 @@ export class BundleConverter {
98107
return fromVersion(time);
99108
}
100109
}
110+
111+
/**
112+
* Returns a `LoadBundleTaskProgress` representing the initial progress of
113+
* loading a bundle.
114+
*/
115+
export function bundleInitialProgress(
116+
metadata: BundleMetadata
117+
): firestore.LoadBundleTaskProgress {
118+
return {
119+
taskState: 'Running',
120+
documentsLoaded: 0,
121+
bytesLoaded: 0,
122+
totalDocuments: metadata.totalDocuments!,
123+
totalBytes: metadata.totalBytes!
124+
};
125+
}
126+
127+
/**
128+
* Returns a `LoadBundleTaskProgress` representing the progress that the loading
129+
* has succeeded.
130+
*/
131+
export function bundleSuccessProgress(
132+
metadata: BundleMetadata
133+
): firestore.LoadBundleTaskProgress {
134+
return {
135+
taskState: 'Success',
136+
documentsLoaded: metadata.totalDocuments!,
137+
bytesLoaded: metadata.totalBytes!,
138+
totalDocuments: metadata.totalDocuments!,
139+
totalBytes: metadata.totalBytes!
140+
};
141+
}
142+
143+
export class BundleLoadResult {
144+
constructor(
145+
readonly progress: firestore.LoadBundleTaskProgress,
146+
readonly changedDocs: MaybeDocumentMap
147+
) {}
148+
}
149+
150+
/**
151+
* A class to process the elements from a bundle, load them into local
152+
* storage and provide progress update while loading.
153+
*/
154+
export class BundleLoader {
155+
/** The current progress of loading */
156+
private progress: firestore.LoadBundleTaskProgress;
157+
/** Batched queries to be saved into storage */
158+
private queries: bundleProto.NamedQuery[] = [];
159+
/** Batched documents to be saved into storage */
160+
private documents: BundledDocuments = [];
161+
162+
constructor(
163+
private metadata: bundleProto.BundleMetadata,
164+
private localStore: LocalStore
165+
) {
166+
this.progress = bundleInitialProgress(metadata);
167+
}
168+
169+
/**
170+
* Adds an element from the bundle to the loader.
171+
*
172+
* Returns a new progress if adding the element leads to a new progress,
173+
* otherwise returns null.
174+
*/
175+
addSizedElement(
176+
element: SizedBundleElement
177+
): firestore.LoadBundleTaskProgress | null {
178+
debugAssert(!element.isBundleMetadata(), 'Unexpected bundle metadata.');
179+
180+
this.progress.bytesLoaded += element.byteLength;
181+
182+
let documentsLoaded = this.progress.documentsLoaded;
183+
184+
if (element.payload.namedQuery) {
185+
this.queries.push(element.payload.namedQuery);
186+
} else if (element.payload.documentMetadata) {
187+
this.documents.push({ metadata: element.payload.documentMetadata });
188+
if (!element.payload.documentMetadata.exists) {
189+
++documentsLoaded;
190+
}
191+
} else if (element.payload.document) {
192+
debugAssert(
193+
this.documents.length > 0 &&
194+
this.documents[this.documents.length - 1].metadata.name ===
195+
element.payload.document.name,
196+
'The document being added does not match the stored metadata.'
197+
);
198+
this.documents[this.documents.length - 1].document =
199+
element.payload.document;
200+
++documentsLoaded;
201+
}
202+
203+
if (documentsLoaded !== this.progress.documentsLoaded) {
204+
this.progress.documentsLoaded = documentsLoaded;
205+
return { ...this.progress };
206+
}
207+
208+
return null;
209+
}
210+
211+
/**
212+
* Update the progress to 'Success' and return the updated progress.
213+
*/
214+
async complete(): Promise<BundleLoadResult> {
215+
debugAssert(
216+
this.documents[this.documents.length - 1]?.metadata.exists !== true ||
217+
!!this.documents[this.documents.length - 1].document,
218+
'Bundled documents ends with a document metadata and missing document.'
219+
);
220+
221+
for (const q of this.queries) {
222+
await saveNamedQuery(this.localStore, q);
223+
}
224+
225+
const changedDocs = await applyBundleDocuments(
226+
this.localStore,
227+
this.documents
228+
);
229+
230+
this.progress.taskState = 'Success';
231+
return new BundleLoadResult({ ...this.progress }, changedDocs);
232+
}
233+
}

packages/firestore/src/core/firestore_client.ts

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* limitations under the License.
1616
*/
1717

18+
import * as firestore from '@firebase/firestore-types';
1819
import { CredentialsProvider } from '../api/credentials';
1920
import { User } from '../auth/user';
2021
import { LocalStore } from '../local/local_store';
@@ -26,15 +27,15 @@ import { newDatastore } from '../remote/datastore';
2627
import { RemoteStore } from '../remote/remote_store';
2728
import { AsyncQueue, wrapInUserErrorIfRecoverable } from '../util/async_queue';
2829
import { Code, FirestoreError } from '../util/error';
29-
import { logDebug } from '../util/log';
30+
import { logDebug, logWarn } from '../util/log';
3031
import { Deferred } from '../util/promise';
3132
import {
3233
EventManager,
3334
ListenOptions,
3435
Observer,
3536
QueryListener
3637
} from './event_manager';
37-
import { SyncEngine } from './sync_engine';
38+
import { SyncEngine, loadBundle } from './sync_engine';
3839
import { View } from './view';
3940

4041
import { SharedClientState } from '../local/shared_client_state';
@@ -47,8 +48,11 @@ import {
4748
ComponentProvider,
4849
MemoryComponentProvider
4950
} from './component_provider';
51+
import { BundleReader } from '../util/bundle_reader';
52+
import { LoadBundleTask } from '../api/bundle';
5053
import { newConnection } from '../platform/connection';
5154
import { newSerializer } from '../platform/serializer';
55+
import { toByteStreamReader } from '../platform/byte_stream_reader';
5256

5357
const LOG_TAG = 'FirestoreClient';
5458
const MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100;
@@ -512,4 +516,27 @@ export class FirestoreClient {
512516
});
513517
return deferred.promise;
514518
}
519+
520+
loadBundle(
521+
data: ReadableStream<Uint8Array> | ArrayBuffer | string
522+
): firestore.LoadBundleTask {
523+
this.verifyNotTerminated();
524+
525+
let content: ReadableStream<Uint8Array> | ArrayBuffer;
526+
if (typeof data === 'string') {
527+
content = new TextEncoder().encode(data);
528+
} else {
529+
content = data;
530+
}
531+
const reader = new BundleReader(toByteStreamReader(content));
532+
const task = new LoadBundleTask();
533+
this.asyncQueue.enqueueAndForget(async () => {
534+
loadBundle(this.syncEngine, reader, task);
535+
return task.catch(e => {
536+
logWarn(LOG_TAG, `Loading bundle failed with ${e}`);
537+
});
538+
});
539+
540+
return task;
541+
}
515542
}

0 commit comments

Comments
 (0)