Skip to content

Dequeue limbo resolutions when their respective queries are stopped #4395

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Feb 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/eight-bananas-nail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@firebase/firestore': patch
---

Fixes a bug where local cache inconsistencies were unnecessarily being resolved.
29 changes: 22 additions & 7 deletions packages/firestore/src/core/sync_engine_impl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import { MaybeDocument, NoDocument } from '../model/document';
import { DocumentKey } from '../model/document_key';
import { Mutation } from '../model/mutation';
import { MutationBatchResult } from '../model/mutation_batch';
import { ResourcePath } from '../model/path';
import { RemoteEvent, TargetChange } from '../remote/remote_event';
import {
canUseNetwork,
Expand Down Expand Up @@ -208,9 +209,14 @@ class SyncEngineImpl implements SyncEngine {
queriesByTarget = new Map<TargetId, Query[]>();
/**
* The keys of documents that are in limbo for which we haven't yet started a
* limbo resolution query.
* limbo resolution query. The strings in this set are the result of calling
* `key.path.canonicalString()` where `key` is a `DocumentKey` object.
*
* The `Set` type was chosen because it provides efficient lookup and removal
* of arbitrary elements and it also maintains insertion order, providing the
* desired queue-like FIFO semantics.
*/
enqueuedLimboResolutions: DocumentKey[] = [];
enqueuedLimboResolutions = new Set<string>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should maybe mention that we can use this datastructure in JS because it preserves insertion order.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

/**
* Keeps track of the target ID for each document that is in limbo with an
* active target.
Expand Down Expand Up @@ -876,6 +882,8 @@ function removeLimboTarget(
syncEngineImpl: SyncEngineImpl,
key: DocumentKey
): void {
syncEngineImpl.enqueuedLimboResolutions.delete(key.path.canonicalString());

// It's possible that the target already got removed because the query failed. In that case,
// the key won't exist in `limboTargetsByKey`. Only do the cleanup if we still have the target.
const limboTargetId = syncEngineImpl.activeLimboTargetsByKey.get(key);
Expand Down Expand Up @@ -925,9 +933,13 @@ function trackLimboChange(
limboChange: AddedLimboDocument
): void {
const key = limboChange.key;
if (!syncEngineImpl.activeLimboTargetsByKey.get(key)) {
const keyString = key.path.canonicalString();
if (
!syncEngineImpl.activeLimboTargetsByKey.get(key) &&
!syncEngineImpl.enqueuedLimboResolutions.has(keyString)
) {
logDebug(LOG_TAG, 'New document in limbo: ' + key);
syncEngineImpl.enqueuedLimboResolutions.push(key);
syncEngineImpl.enqueuedLimboResolutions.add(keyString);
pumpEnqueuedLimboResolutions(syncEngineImpl);
}
}
Expand All @@ -942,11 +954,14 @@ function trackLimboChange(
*/
function pumpEnqueuedLimboResolutions(syncEngineImpl: SyncEngineImpl): void {
while (
syncEngineImpl.enqueuedLimboResolutions.length > 0 &&
syncEngineImpl.enqueuedLimboResolutions.size > 0 &&
syncEngineImpl.activeLimboTargetsByKey.size <
syncEngineImpl.maxConcurrentLimboResolutions
) {
const key = syncEngineImpl.enqueuedLimboResolutions.shift()!;
const keyString = syncEngineImpl.enqueuedLimboResolutions.values().next()
.value;
syncEngineImpl.enqueuedLimboResolutions.delete(keyString);
const key = new DocumentKey(ResourcePath.fromString(keyString));
const limboTargetId = syncEngineImpl.limboTargetIdGenerator.next();
syncEngineImpl.activeLimboResolutionsByTarget.set(
limboTargetId,
Expand Down Expand Up @@ -979,7 +994,7 @@ export function syncEngineGetActiveLimboDocumentResolutions(
// Visible for testing
export function syncEngineGetEnqueuedLimboDocumentResolutions(
syncEngine: SyncEngine
): DocumentKey[] {
): Set<string> {
const syncEngineImpl = debugCast(syncEngine, SyncEngineImpl);
return syncEngineImpl.enqueuedLimboResolutions;
}
Expand Down
145 changes: 145 additions & 0 deletions packages/firestore/test/unit/specs/limbo_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -922,4 +922,149 @@ describeSpec('Limbo Documents:', [], () => {
);
}
);

specTest(
'A limbo resolution for a document should not be started if one is already active',
[],
() => {
const doc1 = doc('collection/doc', 1000, { key: 1 });
const fullQuery = query('collection');
const filteredQuery1 = query('collection', filter('key', '==', 1));
const filteredQuery2 = query('collection', filter('key', '>=', 1));

return (
spec()
.withGCEnabled(false)

// Start a limbo resolution listen for a document (doc1).
.userListens(fullQuery)
.watchAcksFull(fullQuery, 1000, doc1)
.expectEvents(fullQuery, { added: [doc1] })
.userUnlistens(fullQuery)
.userListens(filteredQuery1)
.expectEvents(filteredQuery1, { added: [doc1], fromCache: true })
.watchAcksFull(filteredQuery1, 1001)
.expectLimboDocs(doc1.key)
.expectEnqueuedLimboDocs()

// Put doc1 into limbo in a different query; verify that another limbo
// resolution is neither started nor enqueued.
.userListens(filteredQuery2)
.expectEvents(filteredQuery2, { added: [doc1], fromCache: true })
.watchAcksFull(filteredQuery2, 1002)
.expectLimboDocs(doc1.key)
.expectEnqueuedLimboDocs()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Both of the tests below seem to be subsets of this test. I would therefore move this test under the other tests to first verify the basic implementation before going into the more completed cases.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

);
}
);

specTest(
'A limbo resolution for a document should not be enqueued if one is already enqueued',
[],
() => {
const doc1 = doc('collection1/doc1', 1000, { key: 1 });
const fullQuery1 = query('collection1');
const filteredQuery1 = query('collection1', filter('key', '==', 1));
const doc2 = doc('collection2/doc2', 1000, { key: 2 });
const fullQuery2 = query('collection2');
const filteredQuery2a = query('collection2', filter('key', '==', 2));
const filteredQuery2b = query('collection2', filter('key', '>=', 2));

return (
spec()
.withGCEnabled(false)
.withMaxConcurrentLimboResolutions(1)

// Max out the number of active limbo resolutions.
.userListens(fullQuery1)
.watchAcksFull(fullQuery1, 1000, doc1)
.expectEvents(fullQuery1, { added: [doc1] })
.userUnlistens(fullQuery1)
.userListens(filteredQuery1)
.expectEvents(filteredQuery1, { added: [doc1], fromCache: true })
.watchAcksFull(filteredQuery1, 1001)
.expectLimboDocs(doc1.key)

// Start a limbo resolution listen for a different document (doc2).
.userListens(fullQuery2)
.watchAcksFull(fullQuery2, 1002, doc2)
.expectEvents(fullQuery2, { added: [doc2] })
.userUnlistens(fullQuery2)
.userListens(filteredQuery2a)
.expectEvents(filteredQuery2a, { added: [doc2], fromCache: true })
.watchAcksFull(filteredQuery2a, 1003)
.expectLimboDocs(doc1.key)
.expectEnqueuedLimboDocs(doc2.key)

// Put doc2 into limbo in a different query and verify that it's not
// added to the limbo resolution queue again.
.userListens(filteredQuery2b)
.expectEvents(filteredQuery2b, { added: [doc2], fromCache: true })
.watchAcksFull(filteredQuery2b, 1004)
.expectLimboDocs(doc1.key)
.expectEnqueuedLimboDocs(doc2.key)
);
}
);

specTest(
'A limbo resolution for a document should be removed from the queue when the last query listen stops',
[],
() => {
const doc1 = doc('collection1/doc', 1000, { key: 1 });
const fullQuery1 = query('collection1');
const filteredQuery1 = query('collection1', filter('key', '==', 1));

const doc2 = doc('collection2/doc', 1000, { key: 2 });
const fullQuery2 = query('collection2');
const filteredQuery2a = query('collection2', filter('key', '==', 2));
const filteredQuery2b = query('collection2', filter('key', '>=', 2));

return (
spec()
.withGCEnabled(false)
.withMaxConcurrentLimboResolutions(1)

// Max out the number of active limbo resolutions.
.userListens(fullQuery1)
.watchAcksFull(fullQuery1, 1000, doc1)
.expectEvents(fullQuery1, { added: [doc1] })
.userUnlistens(fullQuery1)
.userListens(filteredQuery1)
.expectEvents(filteredQuery1, { added: [doc1], fromCache: true })
.watchAcksFull(filteredQuery1, 1001)
.expectLimboDocs(doc1.key)

// Enqueue a limbo resolution for doc2.
.userListens(fullQuery2)
.watchAcksFull(fullQuery2, 1002, doc2)
.expectEvents(fullQuery2, { added: [doc2] })
.userUnlistens(fullQuery2)
.userListens(filteredQuery2a)
.expectEvents(filteredQuery2a, { added: [doc2], fromCache: true })
.watchAcksFull(filteredQuery2a, 1003)
.expectLimboDocs(doc1.key)
.expectEnqueuedLimboDocs(doc2.key)

// Start another query that puts doc2 into limbo again.
.userListens(filteredQuery2b)
.expectEvents(filteredQuery2b, { added: [doc2], fromCache: true })
.watchAcksFull(filteredQuery2b, 1004)
.expectLimboDocs(doc1.key)
.expectEnqueuedLimboDocs(doc2.key)

// Stop one of the queries that enqueued a limbo resolution for doc2;
// verify that doc2 is not removed from the limbo resolution queue.
.userUnlistens(filteredQuery2b)
.expectLimboDocs(doc1.key)
.expectEnqueuedLimboDocs(doc2.key)

// Stop the other query that enqueued a limbo resolution for doc2;
// verify that doc2 *is* removed from the limbo resolution queue.
.userUnlistens(filteredQuery2a)
.expectLimboDocs(doc1.key)
.expectEnqueuedLimboDocs()
);
}
);
});
34 changes: 9 additions & 25 deletions packages/firestore/test/unit/specs/spec_test_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ import { primitiveComparator } from '../../../src/util/misc';
import { forEach, objectSize } from '../../../src/util/obj';
import { ObjectMap } from '../../../src/util/obj_map';
import { Deferred, sequence } from '../../../src/util/promise';
import { SortedSet } from '../../../src/util/sorted_set';
import {
byteStringFromString,
deletedDoc,
Expand Down Expand Up @@ -1008,31 +1007,16 @@ abstract class TestRunner {
}

private validateEnqueuedLimboDocs(): void {
let actualLimboDocs = new SortedSet<DocumentKey>(DocumentKey.comparator);
syncEngineGetEnqueuedLimboDocumentResolutions(this.syncEngine).forEach(
key => {
actualLimboDocs = actualLimboDocs.add(key);
}
const actualLimboDocs = Array.from(
syncEngineGetEnqueuedLimboDocumentResolutions(this.syncEngine)
);
const expectedLimboDocs = Array.from(this.expectedEnqueuedLimboDocs, key =>
key.path.canonicalString()
);
expect(actualLimboDocs).to.have.members(
expectedLimboDocs,
'The set of enqueued limbo documents is incorrect'
);
let expectedLimboDocs = new SortedSet<DocumentKey>(DocumentKey.comparator);
this.expectedEnqueuedLimboDocs.forEach(key => {
expectedLimboDocs = expectedLimboDocs.add(key);
});
actualLimboDocs.forEach(key => {
expect(expectedLimboDocs.has(key)).to.equal(
true,
`Found enqueued limbo doc ${key.toString()}, but it was not in ` +
`the set of expected enqueued limbo documents ` +
`(${expectedLimboDocs.toString()})`
);
});
expectedLimboDocs.forEach(key => {
expect(actualLimboDocs.has(key)).to.equal(
true,
`Expected doc ${key.toString()} to be enqueued for limbo resolution, ` +
`but it was not in the queue (${actualLimboDocs.toString()})`
);
});
}

private async validateActiveTargets(): Promise<void> {
Expand Down