Skip to content

Don't persist documents that we already have #720

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,54 +325,41 @@ public SnapshotVersion getLastRemoteSnapshotVersion() {
* <p>LocalDocuments are re-calculated if there are remaining mutations in the queue.
*/
public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEvent remoteEvent) {
SnapshotVersion remoteVersion = remoteEvent.getSnapshotVersion();

// TODO: Call queryEngine.handleDocumentChange() appropriately.
return persistence.runTransaction(
"Apply remote event",
() -> {
Map<Integer, TargetChange> targetChanges = remoteEvent.getTargetChanges();
long sequenceNumber = persistence.getReferenceDelegate().getCurrentSequenceNumber();
Set<DocumentKey> authoritativeUpdates = new HashSet<>();

Map<Integer, TargetChange> targetChanges = remoteEvent.getTargetChanges();
for (Map.Entry<Integer, TargetChange> entry : targetChanges.entrySet()) {
Integer boxedTargetId = entry.getKey();
int targetId = boxedTargetId;
TargetChange change = entry.getValue();

// Do not ref/unref unassigned targetIds - it may lead to leaks.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this comment no longer apply?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote it below since I didn't quite understand it at first.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, cool. I also found the original comment confusing.

QueryData queryData = targetIds.get(targetId);
if (queryData == null) {
QueryData oldQueryData = targetIds.get(targetId);
if (oldQueryData == null) {
// We don't update the remote keys if the query is not active. This ensures that
// we persist the updated query data along with the updated assignment.
continue;
}

// When a global snapshot contains updates (either add or modify) we can completely
// trust these updates as authoritative and blindly apply them to our cache (as a
// defensive measure to promote self-healing in the unfortunate case that our cache
// is ever somehow corrupted / out-of-sync).
//
// If the document is only updated while removing it from a target then watch isn't
// obligated to send the absolute latest version: it can send the first version that
// caused the document not to match.
for (DocumentKey key : change.getAddedDocuments()) {
authoritativeUpdates.add(key);
}
for (DocumentKey key : change.getModifiedDocuments()) {
authoritativeUpdates.add(key);
}

queryCache.removeMatchingKeys(change.getRemovedDocuments(), targetId);
queryCache.addMatchingKeys(change.getAddedDocuments(), targetId);

// Update the resume token if the change includes one. Don't clear any preexisting
// value.
ByteString resumeToken = change.getResumeToken();
// Update the resume token if the change includes one.
if (!resumeToken.isEmpty()) {
QueryData oldQueryData = queryData;
queryData =
queryData.copy(remoteEvent.getSnapshotVersion(), resumeToken, sequenceNumber);
targetIds.put(boxedTargetId, queryData);

if (shouldPersistQueryData(oldQueryData, queryData, change)) {
queryCache.updateQueryData(queryData);
QueryData newQueryData =
oldQueryData.copy(remoteVersion, resumeToken, sequenceNumber);
targetIds.put(boxedTargetId, newQueryData);

// Update the query data if there are target changes (or if sufficient time has
// passed since the last update).
if (shouldPersistQueryData(oldQueryData, newQueryData, change)) {
queryCache.updateQueryData(newQueryData);
}
}
}
Expand All @@ -391,10 +378,9 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEve
MaybeDocument existingDoc = existingDocs.get(key);

if (existingDoc == null
|| (authoritativeUpdates.contains(doc.getKey()) && !existingDoc.hasPendingWrites())
|| doc.getVersion().compareTo(existingDoc.getVersion()) >= 0) {
// If a document update isn't authoritative, make sure we don't apply an old document
// version to the remote cache.
|| doc.getVersion().compareTo(existingDoc.getVersion()) > 0
|| (doc.getVersion().compareTo(existingDoc.getVersion()) == 0
&& existingDoc.hasPendingWrites())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, why do we overwrite the existing doc when it has pending writes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only important question, the rest are nitpicks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the write stream sends us an acknowledgment, we persist a document with hasCommittedMutations. We then pretend we know the state of the document, but that is not always true (another client may have changed a field we didn't write to, or we supplied an ArrayUnion/ArrayRemove transform that was applied to a set of elements that the client didn't yet know about). By allowing the server to send us the state of a committed document, we overwrite the hasCommittedMutations version with the actual version from the backend.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation!

remoteDocuments.add(doc);
changedDocs.put(key, doc);
} else if (doc instanceof NoDocument && doc.getVersion().equals(SnapshotVersion.NONE)) {
Expand Down Expand Up @@ -422,7 +408,6 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEve
// remote events when we get permission denied errors while trying to resolve the
// state of a locally cached document that is in limbo.
SnapshotVersion lastRemoteVersion = queryCache.getLastRemoteSnapshotVersion();
SnapshotVersion remoteVersion = remoteEvent.getSnapshotVersion();
if (!remoteVersion.equals(SnapshotVersion.NONE)) {
hardAssert(
remoteVersion.compareTo(lastRemoteVersion) >= 0,
Expand All @@ -448,10 +433,11 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEve
*/
private static boolean shouldPersistQueryData(
QueryData oldQueryData, QueryData newQueryData, TargetChange change) {
// Avoid clearing any existing value
if (newQueryData.getResumeToken().isEmpty()) return false;
hardAssert(
!newQueryData.getResumeToken().isEmpty(),
"Attempted to persist query data with empty resume token");

// Any resume token is interesting if there isn't one already.
// Always persist query data if we don't already have a resume token.
if (oldQueryData.getResumeToken().isEmpty()) return true;

// Don't allow resume token changes to be buffered indefinitely. This allows us to be reasonably
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1160,4 +1160,27 @@ public void testGetHighestUnacknowledgedBatchId() {
rejectMutation();
assertEquals(MutationBatch.UNKNOWN, localStore.getHighestUnacknowledgedBatchId());
}

@Test
public void testOnlyPersistsUpdatesForDocumentsWhenVersionChanges() {
Query query = Query.atPath(ResourcePath.fromString("foo"));
allocateQuery(query);
assertTargetId(2);

applyRemoteEvent(
addedRemoteEvent(doc("foo/bar", 1, map("val", "old")), asList(2), emptyList()));
assertChanged(doc("foo/bar", 1, map("val", "old"), Document.DocumentState.SYNCED));
assertContains(doc("foo/bar", 1, map("val", "old"), Document.DocumentState.SYNCED));

applyRemoteEvent(
addedRemoteEvent(
asList(doc("foo/bar", 1, map("val", "new")), doc("foo/baz", 2, map("val", "new"))),
asList(2),
emptyList()));

assertChanged(doc("foo/baz", 2, map("val", "new"), Document.DocumentState.SYNCED));
// The update for foo/bar is ignored.
assertContains(doc("foo/bar", 1, map("val", "old"), Document.DocumentState.SYNCED));
assertContains(doc("foo/baz", 2, map("val", "new"), Document.DocumentState.SYNCED));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@
"docs": [
{
"key": "collection/a",
"version": 1000,
"version": 1002,
"value": {
"key": "b"
},
Expand Down Expand Up @@ -948,7 +948,7 @@
"docs": [
{
"key": "collection/a",
"version": 1000,
"version": 1002,
"value": {
"key": "b"
},
Expand Down Expand Up @@ -1055,7 +1055,7 @@
"added": [
{
"key": "collection/a",
"version": 1000,
"version": 1002,
"value": {
"key": "b"
},
Expand Down
Loading