Skip to content

Commit 302f15c

Browse files
Merge
2 parents 15ad9fc + 0465d2f commit 302f15c

12 files changed

+244
-173
lines changed

packages/firestore/src/core/firestore_client.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,8 @@ export class FirestoreClient {
400400
// TODO(index-free): Use IndexFreeQueryEngine/IndexedQueryEngine as appropriate.
401401
const queryEngine = new SimpleQueryEngine();
402402
this.localStore = new LocalStore(this.persistence, queryEngine, user);
403+
await this.localStore.start();
404+
403405
if (maybeLruGc) {
404406
// We're running LRU Garbage collection. Set up the scheduler.
405407
this.lruScheduler = new LruScheduler(

packages/firestore/src/local/indexeddb_persistence.ts

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -317,15 +317,12 @@ export class IndexedDbPersistence implements Persistence {
317317

318318
this.scheduleClientMetadataAndPrimaryLeaseRefreshes();
319319

320-
return this.startRemoteDocumentCache();
321-
})
322-
.then(() =>
323-
this.simpleDb.runTransaction(
320+
return this.simpleDb.runTransaction(
324321
'readonly-idempotent',
325322
[DbTargetGlobal.store],
326323
txn => getHighestListenSequenceNumber(txn)
327-
)
328-
)
324+
);
325+
})
329326
.then(highestListenSequenceNumber => {
330327
this.listenSequence = new ListenSequence(
331328
highestListenSequenceNumber,
@@ -341,12 +338,6 @@ export class IndexedDbPersistence implements Persistence {
341338
});
342339
}
343340

344-
private startRemoteDocumentCache(): Promise<void> {
345-
return this.simpleDb.runTransaction('readonly', ALL_STORES, txn =>
346-
this.remoteDocumentCache.start(txn)
347-
);
348-
}
349-
350341
setPrimaryStateListener(
351342
primaryStateListener: PrimaryStateListener
352343
): Promise<void> {
@@ -769,10 +760,14 @@ export class IndexedDbPersistence implements Persistence {
769760
this.listenSequence.next()
770761
);
771762

763+
<<<<<<< HEAD
772764
if (
773765
mode === 'readwrite-primary' ||
774766
mode === 'readwrite-primary-idempotent'
775767
) {
768+
=======
769+
if (mode === 'readwrite-primary') {
770+
>>>>>>> mrschmidt/idempotent
776771
// While we merely verify that we have (or can acquire) the lease
777772
// immediately, we wait to extend the primary lease until after
778773
// executing transactionOperation(). This ensures that even if the

packages/firestore/src/local/indexeddb_remote_document_cache.ts

Lines changed: 48 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,10 @@ import { PersistenceTransaction } from './persistence';
4949
import { PersistencePromise } from './persistence_promise';
5050
import { RemoteDocumentCache } from './remote_document_cache';
5151
import { RemoteDocumentChangeBuffer } from './remote_document_change_buffer';
52-
import {
53-
IterateOptions,
54-
SimpleDb,
55-
SimpleDbStore,
56-
SimpleDbTransaction
57-
} from './simple_db';
52+
import { IterateOptions, SimpleDbStore } from './simple_db';
5853
import { ObjectMap } from '../util/obj_map';
5954

6055
export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
61-
/** The read time of the last entry consumed by `getNewDocumentChanges()`. */
62-
private lastProcessedReadTime = SnapshotVersion.MIN;
63-
6456
/**
6557
* @param {LocalSerializer} serializer The document serializer.
6658
* @param {IndexManager} indexManager The query indexes that need to be maintained.
@@ -70,18 +62,6 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
7062
private readonly indexManager: IndexManager
7163
) {}
7264

73-
/**
74-
* Starts up the remote document cache.
75-
*
76-
* Reads the ID of the last document change from the documentChanges store.
77-
* Existing changes will not be returned as part of
78-
* `getNewDocumentChanges()`.
79-
*/
80-
// PORTING NOTE: This is only used for multi-tab synchronization.
81-
start(transaction: SimpleDbTransaction): PersistencePromise<void> {
82-
return this.synchronizeLastProcessedReadTime(transaction);
83-
}
84-
8565
/**
8666
* Adds the supplied entries to the cache.
8767
*
@@ -311,14 +291,21 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
311291
.next(() => results);
312292
}
313293

294+
/**
295+
* Returns the set of documents that have been updated since the specified read
296+
* time.
297+
*/
298+
// PORTING NOTE: This is only used for multi-tab synchronization.
314299
getNewDocumentChanges(
315-
transaction: PersistenceTransaction
316-
): PersistencePromise<MaybeDocumentMap> {
300+
transaction: PersistenceTransaction,
301+
sinceReadTime: SnapshotVersion
302+
): PersistencePromise<{
303+
changedDocs: MaybeDocumentMap;
304+
readTime: SnapshotVersion;
305+
}> {
317306
let changedDocs = maybeDocumentMap();
318307

319-
const lastReadTime = this.serializer.toDbTimestampKey(
320-
this.lastProcessedReadTime
321-
);
308+
let lastReadTime = this.serializer.toDbTimestampKey(sinceReadTime);
322309

323310
const documentsStore = remoteDocumentsStore(transaction);
324311
const range = IDBKeyRange.lowerBound(lastReadTime, true);
@@ -330,40 +317,48 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
330317
// the documents directly since we want to keep sentinel deletes.
331318
const doc = this.serializer.fromDbRemoteDocument(dbRemoteDoc);
332319
changedDocs = changedDocs.insert(doc.key, doc);
333-
this.lastProcessedReadTime = this.serializer.fromDbTimestampKey(
334-
dbRemoteDoc.readTime!
335-
);
320+
lastReadTime = dbRemoteDoc.readTime!;
336321
}
337322
)
338-
.next(() => changedDocs);
323+
.next(() => {
324+
return {
325+
changedDocs,
326+
readTime: this.serializer.fromDbTimestampKey(lastReadTime)
327+
};
328+
});
339329
}
340330

341331
/**
342-
* Sets the last processed read time to the maximum read time of the backing
343-
* object store, allowing calls to getNewDocumentChanges() to return subsequent
344-
* changes.
332+
* Returns the last document that has changed, as well as the read time of the
333+
* last change. If no document has changed, returns SnapshotVersion.MIN.
345334
*/
346-
private synchronizeLastProcessedReadTime(
347-
transaction: SimpleDbTransaction
348-
): PersistencePromise<void> {
349-
const documentsStore = SimpleDb.getStore<
350-
DbRemoteDocumentKey,
351-
DbRemoteDocument
352-
>(transaction, DbRemoteDocument.store);
353-
354-
// If there are no existing entries, we set `lastProcessedReadTime` to 0.
355-
this.lastProcessedReadTime = SnapshotVersion.forDeletedDoc();
356-
return documentsStore.iterate(
357-
{ index: DbRemoteDocument.readTimeIndex, reverse: true },
358-
(key, value, control) => {
359-
if (value.readTime) {
360-
this.lastProcessedReadTime = this.serializer.fromDbTimestampKey(
361-
value.readTime
362-
);
335+
// PORTING NOTE: This is only used for multi-tab synchronization.
336+
getLastDocumentChange(
337+
transaction: PersistenceTransaction
338+
): PersistencePromise<{
339+
changedDoc: MaybeDocument | undefined;
340+
readTime: SnapshotVersion;
341+
}> {
342+
const documentsStore = remoteDocumentsStore(transaction);
343+
344+
// If there are no existing entries, we return SnapshotVersion.MIN.
345+
let readTime = SnapshotVersion.MIN;
346+
let changedDoc: MaybeDocument | undefined;
347+
348+
return documentsStore
349+
.iterate(
350+
{ index: DbRemoteDocument.readTimeIndex, reverse: true },
351+
(key, dbRemoteDoc, control) => {
352+
changedDoc = this.serializer.fromDbRemoteDocument(dbRemoteDoc);
353+
if (dbRemoteDoc.readTime) {
354+
readTime = this.serializer.fromDbTimestampKey(dbRemoteDoc.readTime);
355+
}
356+
control.done();
363357
}
364-
control.done();
365-
}
366-
);
358+
)
359+
.next(() => {
360+
return { changedDoc, readTime };
361+
});
367362
}
368363

369364
newChangeBuffer(options?: {

packages/firestore/src/local/local_store.ts

Lines changed: 69 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import { SortedMap } from '../util/sorted_map';
4646
import { LocalDocumentsView } from './local_documents_view';
4747
import { LocalViewChanges } from './local_view_changes';
4848
import { LruGarbageCollector, LruResults } from './lru_garbage_collector';
49+
import { IndexedDbRemoteDocumentCache } from './indexeddb_remote_document_cache';
4950
import { MutationQueue } from './mutation_queue';
5051
import { Persistence, PersistenceTransaction } from './persistence';
5152
import { PersistencePromise } from './persistence_promise';
@@ -177,6 +178,13 @@ export class LocalStore {
177178
q.canonicalId()
178179
);
179180

181+
/**
182+
* The read time of the last entry processed by `getNewDocumentChanges()`.
183+
*
184+
* PORTING NOTE: This is only used for multi-tab synchronization.
185+
*/
186+
private lastDocumentChangeReadTime = SnapshotVersion.MIN;
187+
180188
constructor(
181189
/** Manages our in-memory or durable persistence. */
182190
private persistence: Persistence,
@@ -201,6 +209,11 @@ export class LocalStore {
201209
this.queryEngine.setLocalDocumentsView(this.localDocuments);
202210
}
203211

212+
/** Starts the LocalStore. */
213+
start(): Promise<void> {
214+
return this.synchronizeLastDocumentChangeReadTime();
215+
}
216+
204217
/**
205218
* Tells the LocalStore that the currently authenticated user has changed.
206219
*
@@ -209,10 +222,13 @@ export class LocalStore {
209222
*/
210223
// PORTING NOTE: Android and iOS only return the documents affected by the
211224
// change.
212-
handleUserChange(user: User): Promise<UserChangeResult> {
213-
return this.persistence.runTransaction(
225+
async handleUserChange(user: User): Promise<UserChangeResult> {
226+
let newMutationQueue = this.mutationQueue;
227+
let newLocalDocuments = this.localDocuments;
228+
229+
const result = await this.persistence.runTransaction(
214230
'Handle user change',
215-
'readonly',
231+
'readonly-idempotent',
216232
txn => {
217233
// Swap out the mutation queue, grabbing the pending mutation batches
218234
// before and after.
@@ -222,17 +238,16 @@ export class LocalStore {
222238
.next(promisedOldBatches => {
223239
oldBatches = promisedOldBatches;
224240

225-
this.mutationQueue = this.persistence.getMutationQueue(user);
241+
newMutationQueue = this.persistence.getMutationQueue(user);
226242

227243
// Recreate our LocalDocumentsView using the new
228244
// MutationQueue.
229-
this.localDocuments = new LocalDocumentsView(
245+
newLocalDocuments = new LocalDocumentsView(
230246
this.remoteDocuments,
231-
this.mutationQueue,
247+
newMutationQueue,
232248
this.persistence.getIndexManager()
233249
);
234-
this.queryEngine.setLocalDocumentsView(this.localDocuments);
235-
return this.mutationQueue.getAllMutationBatches(txn);
250+
return newMutationQueue.getAllMutationBatches(txn);
236251
})
237252
.next(newBatches => {
238253
const removedBatchIds: BatchId[] = [];
@@ -257,7 +272,7 @@ export class LocalStore {
257272

258273
// Return the set of all (potentially) changed documents and the list
259274
// of mutation batch IDs that were affected by change.
260-
return this.localDocuments
275+
return newLocalDocuments
261276
.getDocuments(txn, changedKeys)
262277
.next(affectedDocuments => {
263278
return {
@@ -269,7 +284,14 @@ export class LocalStore {
269284
});
270285
}
271286
);
287+
288+
this.mutationQueue = newMutationQueue;
289+
this.localDocuments = newLocalDocuments;
290+
this.queryEngine.setLocalDocumentsView(this.localDocuments);
291+
292+
return result;
272293
}
294+
273295
/* Accept locally generated Mutations and commit them to storage. */
274296
localWrite(mutations: Mutation[]): Promise<LocalWriteResult> {
275297
const localWriteTime = Timestamp.now();
@@ -1037,14 +1059,45 @@ export class LocalStore {
10371059
}
10381060
}
10391061

1062+
/**
1063+
* Returns the set of documents that have been updated since the last call.
1064+
* If this is the first call, returns the set of changes since client
1065+
* initialization. Further invocations will return document changes since
1066+
* the point of rejection.
1067+
*/
10401068
// PORTING NOTE: Multi-tab only.
10411069
getNewDocumentChanges(): Promise<MaybeDocumentMap> {
1042-
return this.persistence.runTransaction(
1043-
'Get new document changes',
1044-
'readonly',
1045-
txn => {
1046-
return this.remoteDocuments.getNewDocumentChanges(txn);
1047-
}
1048-
);
1070+
return this.persistence
1071+
.runTransaction('Get new document changes', 'readonly-idempotent', txn =>
1072+
this.remoteDocuments.getNewDocumentChanges(
1073+
txn,
1074+
this.lastDocumentChangeReadTime
1075+
)
1076+
)
1077+
.then(({ changedDocs, readTime }) => {
1078+
this.lastDocumentChangeReadTime = readTime;
1079+
return changedDocs;
1080+
});
1081+
}
1082+
1083+
/**
1084+
* Reads the newest document change from persistence and forwards the internal
1085+
* synchronization marker so that calls to `getNewDocumentChanges()`
1086+
* only return changes that happened after client initialization.
1087+
*/
1088+
// PORTING NOTE: Multi-tab only.
1089+
async synchronizeLastDocumentChangeReadTime(): Promise<void> {
1090+
if (this.remoteDocuments instanceof IndexedDbRemoteDocumentCache) {
1091+
const remoteDocumentCache = this.remoteDocuments;
1092+
return this.persistence
1093+
.runTransaction(
1094+
'Synchronize last document change read time',
1095+
'readonly-idempotent',
1096+
txn => remoteDocumentCache.getLastDocumentChange(txn)
1097+
)
1098+
.then(({ readTime }) => {
1099+
this.lastDocumentChangeReadTime = readTime;
1100+
});
1101+
}
10491102
}
10501103
}

0 commit comments

Comments
 (0)