Skip to content

Commit 54e1a2f

Browse files
Make getNewDocumentChanges() idempotent (#2255)
1 parent ea3d9a2 commit 54e1a2f

10 files changed

+202
-153
lines changed

packages/firestore/src/core/firestore_client.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,8 @@ export class FirestoreClient {
400400
// TODO(index-free): Use IndexFreeQueryEngine/IndexedQueryEngine as appropriate.
401401
const queryEngine = new SimpleQueryEngine();
402402
this.localStore = new LocalStore(this.persistence, queryEngine, user);
403+
await this.localStore.start();
404+
403405
if (maybeLruGc) {
404406
// We're running LRU Garbage collection. Set up the scheduler.
405407
this.lruScheduler = new LruScheduler(

packages/firestore/src/local/indexeddb_persistence.ts

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -317,15 +317,12 @@ export class IndexedDbPersistence implements Persistence {
317317

318318
this.scheduleClientMetadataAndPrimaryLeaseRefreshes();
319319

320-
return this.startRemoteDocumentCache();
321-
})
322-
.then(() =>
323-
this.simpleDb.runTransaction(
320+
return this.simpleDb.runTransaction(
324321
'readonly-idempotent',
325322
[DbTargetGlobal.store],
326323
txn => getHighestListenSequenceNumber(txn)
327-
)
328-
)
324+
);
325+
})
329326
.then(highestListenSequenceNumber => {
330327
this.listenSequence = new ListenSequence(
331328
highestListenSequenceNumber,
@@ -341,12 +338,6 @@ export class IndexedDbPersistence implements Persistence {
341338
});
342339
}
343340

344-
private startRemoteDocumentCache(): Promise<void> {
345-
return this.simpleDb.runTransaction('readonly', ALL_STORES, txn =>
346-
this.remoteDocumentCache.start(txn)
347-
);
348-
}
349-
350341
setPrimaryStateListener(
351342
primaryStateListener: PrimaryStateListener
352343
): Promise<void> {

packages/firestore/src/local/indexeddb_remote_document_cache.ts

Lines changed: 48 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,10 @@ import { PersistenceTransaction } from './persistence';
4646
import { PersistencePromise } from './persistence_promise';
4747
import { RemoteDocumentCache } from './remote_document_cache';
4848
import { RemoteDocumentChangeBuffer } from './remote_document_change_buffer';
49-
import {
50-
IterateOptions,
51-
SimpleDb,
52-
SimpleDbStore,
53-
SimpleDbTransaction
54-
} from './simple_db';
49+
import { IterateOptions, SimpleDbStore } from './simple_db';
5550
import { ObjectMap } from '../util/obj_map';
5651

5752
export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
58-
/** The read time of the last entry consumed by `getNewDocumentChanges()`. */
59-
private lastProcessedReadTime = SnapshotVersion.MIN;
60-
6153
/**
6254
* @param {LocalSerializer} serializer The document serializer.
6355
* @param {IndexManager} indexManager The query indexes that need to be maintained.
@@ -67,18 +59,6 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
6759
private readonly indexManager: IndexManager
6860
) {}
6961

70-
/**
71-
* Starts up the remote document cache.
72-
*
73-
* Reads the ID of the last document change from the documentChanges store.
74-
* Existing changes will not be returned as part of
75-
* `getNewDocumentChanges()`.
76-
*/
77-
// PORTING NOTE: This is only used for multi-tab synchronization.
78-
start(transaction: SimpleDbTransaction): PersistencePromise<void> {
79-
return this.synchronizeLastProcessedReadTime(transaction);
80-
}
81-
8262
/**
8363
* Adds the supplied entries to the cache.
8464
*
@@ -313,14 +293,21 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
313293
.next(() => results);
314294
}
315295

296+
/**
297+
* Returns the set of documents that have been updated since the specified read
298+
* time.
299+
*/
300+
// PORTING NOTE: This is only used for multi-tab synchronization.
316301
getNewDocumentChanges(
317-
transaction: PersistenceTransaction
318-
): PersistencePromise<MaybeDocumentMap> {
302+
transaction: PersistenceTransaction,
303+
sinceReadTime: SnapshotVersion
304+
): PersistencePromise<{
305+
changedDocs: MaybeDocumentMap;
306+
readTime: SnapshotVersion;
307+
}> {
319308
let changedDocs = maybeDocumentMap();
320309

321-
const lastReadTime = this.serializer.toDbTimestampKey(
322-
this.lastProcessedReadTime
323-
);
310+
let lastReadTime = this.serializer.toDbTimestampKey(sinceReadTime);
324311

325312
const documentsStore = remoteDocumentsStore(transaction);
326313
const range = IDBKeyRange.lowerBound(lastReadTime, true);
@@ -332,40 +319,48 @@ export class IndexedDbRemoteDocumentCache implements RemoteDocumentCache {
332319
// the documents directly since we want to keep sentinel deletes.
333320
const doc = this.serializer.fromDbRemoteDocument(dbRemoteDoc);
334321
changedDocs = changedDocs.insert(doc.key, doc);
335-
this.lastProcessedReadTime = this.serializer.fromDbTimestampKey(
336-
dbRemoteDoc.readTime!
337-
);
322+
lastReadTime = dbRemoteDoc.readTime!;
338323
}
339324
)
340-
.next(() => changedDocs);
325+
.next(() => {
326+
return {
327+
changedDocs,
328+
readTime: this.serializer.fromDbTimestampKey(lastReadTime)
329+
};
330+
});
341331
}
342332

343333
/**
344-
* Sets the last processed read time to the maximum read time of the backing
345-
* object store, allowing calls to getNewDocumentChanges() to return subsequent
346-
* changes.
334+
* Returns the last document that has changed, as well as the read time of the
335+
* last change. If no document has changed, returns SnapshotVersion.MIN.
347336
*/
348-
private synchronizeLastProcessedReadTime(
349-
transaction: SimpleDbTransaction
350-
): PersistencePromise<void> {
351-
const documentsStore = SimpleDb.getStore<
352-
DbRemoteDocumentKey,
353-
DbRemoteDocument
354-
>(transaction, DbRemoteDocument.store);
355-
356-
// If there are no existing entries, we set `lastProcessedReadTime` to 0.
357-
this.lastProcessedReadTime = SnapshotVersion.forDeletedDoc();
358-
return documentsStore.iterate(
359-
{ index: DbRemoteDocument.readTimeIndex, reverse: true },
360-
(key, value, control) => {
361-
if (value.readTime) {
362-
this.lastProcessedReadTime = this.serializer.fromDbTimestampKey(
363-
value.readTime
364-
);
337+
// PORTING NOTE: This is only used for multi-tab synchronization.
338+
getLastDocumentChange(
339+
transaction: PersistenceTransaction
340+
): PersistencePromise<{
341+
changedDoc: MaybeDocument | undefined;
342+
readTime: SnapshotVersion;
343+
}> {
344+
const documentsStore = remoteDocumentsStore(transaction);
345+
346+
// If there are no existing entries, we return SnapshotVersion.MIN.
347+
let readTime = SnapshotVersion.MIN;
348+
let changedDoc: MaybeDocument | undefined;
349+
350+
return documentsStore
351+
.iterate(
352+
{ index: DbRemoteDocument.readTimeIndex, reverse: true },
353+
(key, dbRemoteDoc, control) => {
354+
changedDoc = this.serializer.fromDbRemoteDocument(dbRemoteDoc);
355+
if (dbRemoteDoc.readTime) {
356+
readTime = this.serializer.fromDbTimestampKey(dbRemoteDoc.readTime);
357+
}
358+
control.done();
365359
}
366-
control.done();
367-
}
368-
);
360+
)
361+
.next(() => {
362+
return { changedDoc, readTime };
363+
});
369364
}
370365

371366
newChangeBuffer(options?: {

packages/firestore/src/local/local_store.ts

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import { ObjectMap } from '../util/obj_map';
4444
import { LocalDocumentsView } from './local_documents_view';
4545
import { LocalViewChanges } from './local_view_changes';
4646
import { LruGarbageCollector, LruResults } from './lru_garbage_collector';
47+
import { IndexedDbRemoteDocumentCache } from './indexeddb_remote_document_cache';
4748
import { MutationQueue } from './mutation_queue';
4849
import { Persistence, PersistenceTransaction } from './persistence';
4950
import { PersistencePromise } from './persistence_promise';
@@ -168,6 +169,13 @@ export class LocalStore {
168169
q.canonicalId()
169170
);
170171

172+
/**
173+
* The read time of the last entry processed by `getNewDocumentChanges()`.
174+
*
175+
* PORTING NOTE: This is only used for multi-tab synchronization.
176+
*/
177+
private lastDocumentChangeReadTime = SnapshotVersion.MIN;
178+
171179
constructor(
172180
/** Manages our in-memory or durable persistence. */
173181
private persistence: Persistence,
@@ -192,6 +200,11 @@ export class LocalStore {
192200
this.queryEngine.setLocalDocumentsView(this.localDocuments);
193201
}
194202

203+
/** Starts the LocalStore. */
204+
start(): Promise<void> {
205+
return this.synchronizeLastDocumentChangeReadTime();
206+
}
207+
195208
/**
196209
* Tells the LocalStore that the currently authenticated user has changed.
197210
*
@@ -1015,14 +1028,45 @@ export class LocalStore {
10151028
}
10161029
}
10171030

1031+
/**
1032+
* Returns the set of documents that have been updated since the last call.
1033+
* If this is the first call, returns the set of changes since client
1034+
* initialization. Further invocations will return document changes since
1035+
* the point of rejection.
1036+
*/
10181037
// PORTING NOTE: Multi-tab only.
10191038
getNewDocumentChanges(): Promise<MaybeDocumentMap> {
1020-
return this.persistence.runTransaction(
1021-
'Get new document changes',
1022-
'readonly',
1023-
txn => {
1024-
return this.remoteDocuments.getNewDocumentChanges(txn);
1025-
}
1026-
);
1039+
return this.persistence
1040+
.runTransaction('Get new document changes', 'readonly-idempotent', txn =>
1041+
this.remoteDocuments.getNewDocumentChanges(
1042+
txn,
1043+
this.lastDocumentChangeReadTime
1044+
)
1045+
)
1046+
.then(({ changedDocs, readTime }) => {
1047+
this.lastDocumentChangeReadTime = readTime;
1048+
return changedDocs;
1049+
});
1050+
}
1051+
1052+
/**
1053+
* Reads the newest document change from persistence and forwards the internal
1054+
* synchronization marker so that calls to `getNewDocumentChanges()`
1055+
* only return changes that happened after client initialization.
1056+
*/
1057+
// PORTING NOTE: Multi-tab only.
1058+
async synchronizeLastDocumentChangeReadTime(): Promise<void> {
1059+
if (this.remoteDocuments instanceof IndexedDbRemoteDocumentCache) {
1060+
const remoteDocumentCache = this.remoteDocuments;
1061+
return this.persistence
1062+
.runTransaction(
1063+
'Synchronize last document change read time',
1064+
'readonly-idempotent',
1065+
txn => remoteDocumentCache.getLastDocumentChange(txn)
1066+
)
1067+
.then(({ readTime }) => {
1068+
this.lastDocumentChangeReadTime = readTime;
1069+
});
1070+
}
10271071
}
10281072
}

packages/firestore/src/local/memory_remote_document_cache.ts

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,14 @@
1818
import { Query } from '../core/query';
1919
import {
2020
DocumentKeySet,
21-
documentKeySet,
2221
DocumentMap,
2322
documentMap,
2423
DocumentSizeEntry,
2524
MaybeDocumentMap,
26-
maybeDocumentMap,
2725
NullableMaybeDocumentMap,
2826
nullableMaybeDocumentMap
2927
} from '../model/collections';
30-
import { Document, MaybeDocument, NoDocument } from '../model/document';
28+
import { Document, MaybeDocument } from '../model/document';
3129
import { DocumentKey } from '../model/document_key';
3230

3331
import { SnapshotVersion } from '../core/snapshot_version';
@@ -57,9 +55,6 @@ export class MemoryRemoteDocumentCache implements RemoteDocumentCache {
5755
/** Underlying cache of documents and their read times. */
5856
private docs = documentEntryMap();
5957

60-
/** Set of documents changed since last call to `getNewDocumentChanges()`. */
61-
private newDocumentChanges = documentKeySet();
62-
6358
/** Size of all cached documents. */
6459
private size = 0;
6560

@@ -99,7 +94,6 @@ export class MemoryRemoteDocumentCache implements RemoteDocumentCache {
9994
readTime
10095
});
10196

102-
this.newDocumentChanges = this.newDocumentChanges.add(key);
10397
this.size += currentSize - previousSize;
10498

10599
return this.indexManager.addToCollectionParentIndex(
@@ -117,7 +111,6 @@ export class MemoryRemoteDocumentCache implements RemoteDocumentCache {
117111
private removeEntry(documentKey: DocumentKey): void {
118112
const entry = this.docs.get(documentKey);
119113
if (entry) {
120-
this.newDocumentChanges = this.newDocumentChanges.add(documentKey);
121114
this.docs = this.docs.remove(documentKey);
122115
this.size -= entry.size;
123116
}
@@ -184,21 +177,15 @@ export class MemoryRemoteDocumentCache implements RemoteDocumentCache {
184177
}
185178

186179
getNewDocumentChanges(
187-
transaction: PersistenceTransaction
188-
): PersistencePromise<MaybeDocumentMap> {
189-
let changedDocs = maybeDocumentMap();
190-
191-
this.newDocumentChanges.forEach(key => {
192-
const entry = this.docs.get(key);
193-
const changedDoc = entry
194-
? entry.maybeDocument
195-
: new NoDocument(key, SnapshotVersion.forDeletedDoc());
196-
changedDocs = changedDocs.insert(key, changedDoc);
197-
});
198-
199-
this.newDocumentChanges = documentKeySet();
200-
201-
return PersistencePromise.resolve(changedDocs);
180+
transaction: PersistenceTransaction,
181+
sinceReadTime: SnapshotVersion
182+
): PersistencePromise<{
183+
changedDocs: MaybeDocumentMap;
184+
readTime: SnapshotVersion;
185+
}> {
186+
throw new Error(
187+
'getNewDocumentChanges() is not supported with MemoryPersistence'
188+
);
202189
}
203190

204191
newChangeBuffer(options?: {

packages/firestore/src/local/remote_document_cache.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,17 @@ export interface RemoteDocumentCache {
8383
): PersistencePromise<DocumentMap>;
8484

8585
/**
86-
* Returns the set of documents that have been updated since the last call.
87-
* If this is the first call, returns the set of changes since client
88-
* initialization. Further invocations will return document changes since
89-
* the point of rejection.
86+
* Returns the set of documents that have changed since the specified read
87+
* time.
9088
*/
9189
// PORTING NOTE: This is only used for multi-tab synchronization.
9290
getNewDocumentChanges(
93-
transaction: PersistenceTransaction
94-
): PersistencePromise<MaybeDocumentMap>;
91+
transaction: PersistenceTransaction,
92+
sinceReadTime: SnapshotVersion
93+
): PersistencePromise<{
94+
changedDocs: MaybeDocumentMap;
95+
readTime: SnapshotVersion;
96+
}>;
9597

9698
/**
9799
* Provides access to add or update the contents of the cache. The buffer

packages/firestore/test/unit/local/local_store.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -443,6 +443,7 @@ function genericLocalStoreTests(
443443
countingQueryEngine,
444444
User.UNAUTHENTICATED
445445
);
446+
await localStore.start();
446447
});
447448

448449
afterEach(async () => {

0 commit comments

Comments
 (0)