Skip to content

Commit 37ecd7a

Browse files
Adding authoritative updates back
1 parent 4993479 commit 37ecd7a

File tree

6 files changed

+105
-36
lines changed

6 files changed

+105
-36
lines changed

packages/firestore/src/local/indexeddb_schema.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -450,7 +450,7 @@ export class DbRemoteDocument {
450450
* not known, but yet we no the document exist (e.g. it had a successful
451451
* update applied to it)
452452
*/
453-
public unknownDocument: DbUnknownDocument | null,
453+
public unknownDocument: DbUnknownDocument | null | undefined,
454454
/**
455455
* Set to an instance of a DbNoDocument if it is known that no document
456456
* exists.
@@ -467,7 +467,7 @@ export class DbRemoteDocument {
467467
* documents are potentially inconsistent with the document's version on
468468
* the backend.
469469
*/
470-
public hasCommittedMutations?: boolean
470+
public hasCommittedMutations: boolean | undefined
471471
) {}
472472
}
473473

packages/firestore/src/local/local_store.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import { ReferenceSet } from './reference_set';
5050
import { RemoteDocumentCache } from './remote_document_cache';
5151
import { RemoteDocumentChangeBuffer } from './remote_document_change_buffer';
5252
import { ClientId, SharedClientState } from './shared_client_state';
53+
import { Document } from '../model/document';
5354

5455
const LOG_TAG = 'LocalStore';
5556

@@ -498,13 +499,30 @@ export class LocalStore {
498499
const documentBuffer = new RemoteDocumentChangeBuffer(this.remoteDocuments);
499500
return this.persistence.runTransaction('Apply remote event', true, txn => {
500501
const promises = [] as Array<PersistencePromise<void>>;
502+
let authoritativeUpdates = documentKeySet();
501503
objUtils.forEachNumber(
502504
remoteEvent.targetChanges,
503505
(targetId: TargetId, change: TargetChange) => {
504506
// Do not ref/unref unassigned targetIds - it may lead to leaks.
505507
let queryData = this.queryDataByTarget[targetId];
506508
if (!queryData) return;
507509

510+
// When a global snapshot contains updates (either add or modify) we
511+
// can completely trust these updates as authoritative and blindly
512+
// apply them to our cache (as a defensive measure to promote
513+
// self-healing in the unfortunate case that our cache is ever somehow
514+
// corrupted / out-of-sync).
515+
//
516+
// If the document is only updated while removing it from a target
517+
// then watch isn't obligated to send the absolute latest version: it
518+
// can send the first version that caused the document not to match.
519+
change.addedDocuments.forEach(key => {
520+
authoritativeUpdates = authoritativeUpdates.add(key);
521+
});
522+
change.modifiedDocuments.forEach(key => {
523+
authoritativeUpdates = authoritativeUpdates.add(key);
524+
});
525+
508526
promises.push(
509527
this.queryCache
510528
.removeMatchingKeys(txn, change.removedDocuments, targetId)
@@ -542,9 +560,16 @@ export class LocalStore {
542560
changedDocKeys = changedDocKeys.add(key);
543561
promises.push(
544562
documentBuffer.getEntry(txn, key).next(existingDoc => {
563+
// If a document update isn't authoritative, make sure we don't
564+
// apply an old document version to the remote cache. We make an
565+
// exception for SnapshotVersion.MIN which can happen for
566+
// manufactured events (e.g. in the case of a limbo document
567+
// resolution failing).
545568
if (
546569
existingDoc == null ||
547570
doc.version.isEqual(SnapshotVersion.MIN) ||
571+
(authoritativeUpdates.has(doc.key) &&
572+
!existingDoc.hasPendingWrites()) ||
548573
doc.version.compareTo(existingDoc.version) >= 0
549574
) {
550575
documentBuffer.addEntry(doc);

packages/firestore/src/model/document.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ export abstract class MaybeDocument {
3838
return DocumentKey.comparator(d1.key, d2.key);
3939
}
4040

41+
abstract hasPendingWrites(): boolean;
42+
4143
abstract isEqual(other: MaybeDocument | null | undefined): boolean;
4244
}
4345

@@ -92,6 +94,10 @@ export class Document extends MaybeDocument {
9294
);
9395
}
9496

97+
hasPendingWrites(): boolean {
98+
return this.hasLocalMutations || this.hasCommittedMutations;
99+
}
100+
95101
static compareByField(field: FieldPath, d1: Document, d2: Document): number {
96102
const v1 = d1.field(field);
97103
const v2 = d2.field(field);
@@ -117,6 +123,10 @@ export class NoDocument extends MaybeDocument {
117123
return `NoDocument(${this.key}, ${this.version})`;
118124
}
119125

126+
hasPendingWrites(): boolean {
127+
return false;
128+
}
129+
120130
isEqual(other: MaybeDocument | null | undefined): boolean {
121131
return (
122132
other instanceof NoDocument &&
@@ -139,6 +149,10 @@ export class UnknownDocument extends MaybeDocument {
139149
return `UnknownDocument(${this.key}, ${this.version})`;
140150
}
141151

152+
hasPendingWrites(): boolean {
153+
return true;
154+
}
155+
142156
isEqual(other: MaybeDocument | null | undefined): boolean {
143157
return (
144158
other instanceof UnknownDocument &&

packages/firestore/src/model/mutation.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,7 @@ export class TransformMutation extends Mutation {
499499
maybeDoc,
500500
mutationResult.transformResults!
501501
);
502+
502503
const version = mutationResult.version;
503504
const newData = this.transformObject(doc.data, transformResults);
504505
return new Document(this.key, version, newData, {

packages/firestore/test/unit/specs/listen_spec.test.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,69 @@ describeSpec('Listens:', [], () => {
369369
);
370370
});
371371

372+
specTest('Waits until Watch catches up to local deletes ', [], () => {
373+
const query1 = Query.atPath(path('collection'));
374+
const docAv1 = doc('collection/a', 1000, { v: '1' });
375+
const docAv2 = doc('collection/a', 2000, { v: '2' });
376+
const docAv3 = doc('collection/a', 4000, { v: '3' });
377+
const docAv5 = doc('collection/a', 5000, { v: '5' });
378+
379+
return (
380+
spec()
381+
// Disable GC so the cache persists across listens.
382+
.withGCEnabled(false)
383+
.userListens(query1)
384+
.watchAcksFull(query1, 1000, docAv1)
385+
.expectEvents(query1, { added: [docAv1] })
386+
.userDeletes('collection/a')
387+
.expectEvents(query1, { removed: [docAv1] })
388+
.watchSends({ affects: [query1] }, docAv2)
389+
.watchSnapshots(2000)
390+
// The write stream acks our delete at version 4000.
391+
.writeAcks('collection/a', 4000)
392+
.watchSends({ affects: [query1] }, docAv3)
393+
.watchSnapshots(3000)
394+
// Watch now sends us a document past our deleted version.
395+
.watchSends({ affects: [query1] }, docAv5)
396+
.watchSnapshots(5000)
397+
.expectEvents(query1, { added: [docAv5] })
398+
);
399+
});
400+
401+
specTest('Deleted documents in cache are fixed', [], () => {
402+
const allQuery = Query.atPath(path('collection'));
403+
const setupQuery = allQuery.addFilter(filter('key', '==', 'a'));
404+
405+
const docAv1 = doc('collection/a', 1000, { key: 'a' });
406+
const docDeleted = deletedDoc('collection/a', 2000);
407+
408+
return (
409+
spec()
410+
// Presuppose an initial state where the remote document cache has a
411+
// broken synthesized delete at a timestamp later than the true version
412+
// of the document. This requires both adding and later removing the
413+
// document in order to force the watch change aggregator to propagate
414+
// the deletion.
415+
.withGCEnabled(false)
416+
.userListens(setupQuery)
417+
.watchAcksFull(setupQuery, 1000, docAv1)
418+
.expectEvents(setupQuery, { added: [docAv1], fromCache: false })
419+
.watchSends({ removed: [setupQuery] }, docDeleted)
420+
.watchSnapshots(2000, [setupQuery], 'resume-token-2000')
421+
.watchSnapshots(2000)
422+
.expectEvents(setupQuery, { removed: [docAv1], fromCache: false })
423+
.userUnlistens(setupQuery)
424+
.watchRemoves(setupQuery)
425+
426+
// Now when the client listens expect the cached NoDocument to be
427+
// discarded because the global snapshot version exceeds what came
428+
// before.
429+
.userListens(allQuery)
430+
.watchAcksFull(allQuery, 3000, docAv1)
431+
.expectEvents(allQuery, { added: [docAv1], fromCache: false })
432+
);
433+
});
434+
372435
specTest('Listens are reestablished after network disconnect', [], () => {
373436
const expectRequestCount = requestCounts =>
374437
requestCounts.addTarget + requestCounts.removeTarget;

packages/firestore/test/unit/specs/write_spec.test.ts

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -882,40 +882,6 @@ describeSpec('Writes:', [], () => {
882882
.expectEvents(query, {
883883
metadata: [docA]
884884
})
885-
);
886-
});
887-
888-
specTest('Writes are held during primary failover', ['multi-client'], () => {
889-
const query1 = Query.atPath(path('collection'));
890-
const query2 = Query.atPath(path('collection/doc'));
891-
const docV1 = doc(
892-
'collection/doc',
893-
0,
894-
{ v: 1 },
895-
{ hasLocalMutations: true }
896-
);
897-
const docV1Committed = doc(
898-
'collection/doc',
899-
2000,
900-
{ v: 1 },
901-
{ hasCommittedMutations: true }
902-
);
903-
const docV1Acknowledged = doc('collection/doc', 2000, { v: 1 });
904-
return (
905-
client(0)
906-
.userListens(query1)
907-
.userSets('collection/doc', { v: 1 })
908-
.expectEvents(query1, {
909-
hasPendingWrites: true,
910-
added: [docV1],
911-
fromCache: true
912-
})
913-
.watchAcksFull(query1, 1000)
914-
.expectEvents(query1, {
915-
hasPendingWrites: true
916-
})
917-
.writeAcks('collection/doc', 2000)
918-
// Start a new client. DocV1 still has pending writes.
919885
.client(1)
920886
.expectUserCallbacks({
921887
acknowledged: ['collection/a']

0 commit comments

Comments
 (0)