Skip to content

Don't persist documents that we already have #2099

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 30 additions & 50 deletions packages/firestore/src/local/local_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -449,37 +449,23 @@ export class LocalStore {
*/
applyRemoteEvent(remoteEvent: RemoteEvent): Promise<MaybeDocumentMap> {
const documentBuffer = this.remoteDocuments.newChangeBuffer();
const snapshotVersion = remoteEvent.snapshotVersion;
return this.persistence.runTransaction(
'Apply remote event',
'readwrite-primary',
txn => {
const promises = [] as Array<PersistencePromise<void>>;
let authoritativeUpdates = documentKeySet();
objUtils.forEachNumber(
remoteEvent.targetChanges,
(targetId: TargetId, change: TargetChange) => {
// Do not ref/unref unassigned targetIds - it may lead to leaks.
let queryData = this.queryDataByTarget[targetId];
if (!queryData) {
const oldQueryData = this.queryDataByTarget[targetId];
if (!oldQueryData) {
return;
}

// When a global snapshot contains updates (either add or modify) we
// can completely trust these updates as authoritative and blindly
// apply them to our cache (as a defensive measure to promote
// self-healing in the unfortunate case that our cache is ever somehow
// corrupted / out-of-sync).
//
// If the document is only updated while removing it from a target
// then watch isn't obligated to send the absolute latest version: it
// can send the first version that caused the document not to match.
change.addedDocuments.forEach(key => {
authoritativeUpdates = authoritativeUpdates.add(key);
});
change.modifiedDocuments.forEach(key => {
authoritativeUpdates = authoritativeUpdates.add(key);
});

// Only update the remote keys if the query is still active. This
// ensures that we can persist the updated query data along with
// the updated assignment.
promises.push(
this.queryCache
.removeMatchingKeys(txn, change.removedDocuments, targetId)
Expand All @@ -492,25 +478,27 @@ export class LocalStore {
})
);

// Update the resume token if the change includes one. Don't clear
// any preexisting value.
const resumeToken = change.resumeToken;
// Update the resume token if the change includes one.
if (resumeToken.length > 0) {
const oldQueryData = queryData;
queryData = queryData.copy({
const newQueryData = oldQueryData.copy({
resumeToken,
snapshotVersion: remoteEvent.snapshotVersion
snapshotVersion
});
this.queryDataByTarget[targetId] = queryData;
this.queryDataByTarget[targetId] = newQueryData;

// Update the query data if there are target changes (or if
// sufficient time has passed since the last update).
if (
LocalStore.shouldPersistQueryData(
oldQueryData,
queryData,
newQueryData,
change
)
) {
promises.push(this.queryCache.updateQueryData(txn, queryData));
promises.push(
this.queryCache.updateQueryData(txn, newQueryData)
);
}
}
}
Expand All @@ -528,19 +516,12 @@ export class LocalStore {
documentBuffer.getEntries(txn, updatedKeys).next(existingDocs => {
remoteEvent.documentUpdates.forEach((key, doc) => {
const existingDoc = existingDocs.get(key);
// If a document update isn't authoritative, make sure we don't
// apply an old document version to the remote cache. We make an
// exception for SnapshotVersion.MIN which can happen for
// manufactured events (e.g. in the case of a limbo document
// resolution failing).
if (
existingDoc == null ||
(authoritativeUpdates.has(doc.key) &&
!existingDoc.hasPendingWrites) ||
doc.version.compareTo(existingDoc.version) >= 0
doc.version.compareTo(existingDoc.version) > 0 ||
(doc.version.compareTo(existingDoc.version) === 0 &&
existingDoc.hasPendingWrites)
) {
// If a document update isn't authoritative, make sure we don't apply an old document
// version to the remote cache.
documentBuffer.addEntry(doc);
changedDocs = changedDocs.insert(key, doc);
} else if (
Expand Down Expand Up @@ -580,22 +561,21 @@ export class LocalStore {
// can synthesize remote events when we get permission denied errors while
// trying to resolve the state of a locally cached document that is in
// limbo.
const remoteVersion = remoteEvent.snapshotVersion;
if (!remoteVersion.isEqual(SnapshotVersion.MIN)) {
if (!snapshotVersion.isEqual(SnapshotVersion.MIN)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: why doesn't this change apply to the Android port?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a rename to which allows me to use an object shorthand notation (instead of {snapshotVersion:remoteVersion} I can now use {snapshotVersion}.

const updateRemoteVersion = this.queryCache
.getLastRemoteSnapshotVersion(txn)
.next(lastRemoteVersion => {
.next(lastRemoteSnapshotVersion => {
assert(
remoteVersion.compareTo(lastRemoteVersion) >= 0,
snapshotVersion.compareTo(lastRemoteSnapshotVersion) >= 0,
'Watch stream reverted to previous snapshot?? ' +
remoteVersion +
snapshotVersion +
' < ' +
lastRemoteVersion
lastRemoteSnapshotVersion
);
return this.queryCache.setTargetsMetadata(
txn,
txn.currentSequenceNumber,
remoteVersion
snapshotVersion
);
});
promises.push(updateRemoteVersion);
Expand Down Expand Up @@ -629,12 +609,12 @@ export class LocalStore {
newQueryData: QueryData,
change: TargetChange
): boolean {
// Avoid clearing any existing value
if (newQueryData.resumeToken.length === 0) {
return false;
}
assert(
newQueryData.resumeToken.length > 0,
'Attempted to persist query data with no resume token'
);

// Any resume token is interesting if there isn't one already.
// Always persist query data if we don't already have a resume token.
if (oldQueryData.resumeToken.length === 0) {
return true;
}
Expand Down
28 changes: 28 additions & 0 deletions packages/firestore/test/unit/local/local_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1261,4 +1261,32 @@ function genericLocalStoreTests(
.toReturnHighestUnacknowledgeBatchId(BATCHID_UNKNOWN)
.finish();
});

it('only persists updates for focuments when version changes', () => {
const query = Query.atPath(path('foo'));
return (
expectLocalStore()
.afterAllocatingQuery(query)
.toReturnTargetId(2)
.afterRemoteEvent(
docAddedRemoteEvent(doc('foo/bar', 1, { val: 'old' }), [2])
)
.toReturnChanged(doc('foo/bar', 1, { val: 'old' }))
.toContain(doc('foo/bar', 1, { val: 'old' }))
.afterRemoteEvent(
docAddedRemoteEvent(
[
doc('foo/bar', 1, { val: 'new' }),
doc('foo/baz', 2, { val: 'new' })
],
[2]
)
)
.toReturnChanged(doc('foo/baz', 2, { val: 'new' }))
// The update to foo/bar is ignored.
.toContain(doc('foo/bar', 1, { val: 'old' }))
.toContain(doc('foo/baz', 2, { val: 'new' }))
.finish()
);
});
}
34 changes: 0 additions & 34 deletions packages/firestore/test/unit/specs/listen_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,40 +413,6 @@ describeSpec('Listens:', [], () => {
);
});

specTest('Deleted documents in cache are fixed', [], () => {
const allQuery = Query.atPath(path('collection'));
const setupQuery = allQuery.addFilter(filter('key', '==', 'a'));

const docAv1 = doc('collection/a', 1000, { key: 'a' });
const docDeleted = deletedDoc('collection/a', 2000);

return (
spec()
// Presuppose an initial state where the remote document cache has a
// broken synthesized delete at a timestamp later than the true version
// of the document. This requires both adding and later removing the
// document in order to force the watch change aggregator to propagate
// the deletion.
.withGCEnabled(false)
.userListens(setupQuery)
.watchAcksFull(setupQuery, 1000, docAv1)
.expectEvents(setupQuery, { added: [docAv1], fromCache: false })
.watchSends({ removed: [setupQuery] }, docDeleted)
.watchSnapshots(2000, [setupQuery], 'resume-token-2000')
.watchSnapshots(2000)
.expectEvents(setupQuery, { removed: [docAv1], fromCache: false })
.userUnlistens(setupQuery)
.watchRemoves(setupQuery)

// Now when the client listens expect the cached NoDocument to be
// discarded because the global snapshot version exceeds what came
// before.
.userListens(allQuery)
.watchAcksFull(allQuery, 3000, docAv1)
.expectEvents(allQuery, { added: [docAv1], fromCache: false })
);
});

specTest('Listens are reestablished after network disconnect', [], () => {
const expectRequestCount = (requestCounts: {
[type: string]: number;
Expand Down
55 changes: 36 additions & 19 deletions packages/firestore/test/util/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -284,33 +284,50 @@ export function queryData(
}

export function docAddedRemoteEvent(
doc: MaybeDocument,
docOrDocs: MaybeDocument | MaybeDocument[],
updatedInTargets?: TargetId[],
removedFromTargets?: TargetId[],
limboTargets?: TargetId[]
activeTargets?: TargetId[]
): RemoteEvent {
assert(
!(doc instanceof Document) || !doc.hasLocalMutations,
"Docs from remote updates shouldn't have local changes."
);
const docChange = new DocumentWatchChange(
updatedInTargets || [],
removedFromTargets || [],
doc.key,
doc
);
const docs = Array.isArray(docOrDocs) ? docOrDocs : [docOrDocs];
assert(docs.length !== 0, 'Cannot pass empty docs array');

const allTargets = activeTargets
? activeTargets
: (updatedInTargets || []).concat(removedFromTargets || []);

const aggregator = new WatchChangeAggregator({
getRemoteKeysForTarget: () => documentKeySet(),
getQueryDataForTarget: targetId => {
const purpose =
limboTargets && limboTargets.indexOf(targetId) !== -1
? QueryPurpose.LimboResolution
: QueryPurpose.Listen;
return queryData(targetId, purpose, doc.key.toString());
if (allTargets.indexOf(targetId) !== -1) {
const collectionPath = docs[0].key.path.popLast();
return queryData(
targetId,
QueryPurpose.Listen,
collectionPath.toString()
);
} else {
return null;
}
}
});
aggregator.handleDocumentChange(docChange);
return aggregator.createRemoteEvent(doc.version);

for (const doc of docs) {
assert(
!(doc instanceof Document) || !doc.hasLocalMutations,
"Docs from remote updates shouldn't have local changes."
);
const docChange = new DocumentWatchChange(
updatedInTargets || [],
removedFromTargets || [],
doc.key,
doc
);
aggregator.handleDocumentChange(docChange);
}

const version = docs[0].version;
return aggregator.createRemoteEvent(version);
}

export function docUpdateRemoteEvent(
Expand Down