-
Notifications
You must be signed in to change notification settings - Fork 620
Don't persist documents that we already have #720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -325,54 +325,41 @@ public SnapshotVersion getLastRemoteSnapshotVersion() { | |
* <p>LocalDocuments are re-calculated if there are remaining mutations in the queue. | ||
*/ | ||
public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEvent remoteEvent) { | ||
SnapshotVersion remoteVersion = remoteEvent.getSnapshotVersion(); | ||
|
||
// TODO: Call queryEngine.handleDocumentChange() appropriately. | ||
return persistence.runTransaction( | ||
"Apply remote event", | ||
() -> { | ||
Map<Integer, TargetChange> targetChanges = remoteEvent.getTargetChanges(); | ||
long sequenceNumber = persistence.getReferenceDelegate().getCurrentSequenceNumber(); | ||
Set<DocumentKey> authoritativeUpdates = new HashSet<>(); | ||
|
||
Map<Integer, TargetChange> targetChanges = remoteEvent.getTargetChanges(); | ||
for (Map.Entry<Integer, TargetChange> entry : targetChanges.entrySet()) { | ||
Integer boxedTargetId = entry.getKey(); | ||
int targetId = boxedTargetId; | ||
TargetChange change = entry.getValue(); | ||
|
||
// Do not ref/unref unassigned targetIds - it may lead to leaks. | ||
QueryData queryData = targetIds.get(targetId); | ||
if (queryData == null) { | ||
QueryData oldQueryData = targetIds.get(targetId); | ||
if (oldQueryData == null) { | ||
// We don't update the remote keys if the query is not active. This ensures that | ||
// we persist the updated query data along with the updated assignment. | ||
continue; | ||
} | ||
|
||
// When a global snapshot contains updates (either add or modify) we can completely | ||
// trust these updates as authoritative and blindly apply them to our cache (as a | ||
// defensive measure to promote self-healing in the unfortunate case that our cache | ||
// is ever somehow corrupted / out-of-sync). | ||
// | ||
// If the document is only updated while removing it from a target then watch isn't | ||
// obligated to send the absolute latest version: it can send the first version that | ||
// caused the document not to match. | ||
for (DocumentKey key : change.getAddedDocuments()) { | ||
authoritativeUpdates.add(key); | ||
} | ||
for (DocumentKey key : change.getModifiedDocuments()) { | ||
authoritativeUpdates.add(key); | ||
} | ||
|
||
queryCache.removeMatchingKeys(change.getRemovedDocuments(), targetId); | ||
queryCache.addMatchingKeys(change.getAddedDocuments(), targetId); | ||
|
||
// Update the resume token if the change includes one. Don't clear any preexisting | ||
// value. | ||
ByteString resumeToken = change.getResumeToken(); | ||
// Update the resume token if the change includes one. | ||
if (!resumeToken.isEmpty()) { | ||
QueryData oldQueryData = queryData; | ||
queryData = | ||
queryData.copy(remoteEvent.getSnapshotVersion(), resumeToken, sequenceNumber); | ||
targetIds.put(boxedTargetId, queryData); | ||
|
||
if (shouldPersistQueryData(oldQueryData, queryData, change)) { | ||
queryCache.updateQueryData(queryData); | ||
QueryData newQueryData = | ||
oldQueryData.copy(remoteVersion, resumeToken, sequenceNumber); | ||
targetIds.put(boxedTargetId, newQueryData); | ||
|
||
// Update the query data if there are target changes (or if sufficient time has | ||
// passed since the last update). | ||
if (shouldPersistQueryData(oldQueryData, newQueryData, change)) { | ||
queryCache.updateQueryData(newQueryData); | ||
} | ||
} | ||
} | ||
|
@@ -391,10 +378,9 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEve | |
MaybeDocument existingDoc = existingDocs.get(key); | ||
|
||
if (existingDoc == null | ||
|| (authoritativeUpdates.contains(doc.getKey()) && !existingDoc.hasPendingWrites()) | ||
|| doc.getVersion().compareTo(existingDoc.getVersion()) >= 0) { | ||
// If a document update isn't authoritative, make sure we don't apply an old document | ||
// version to the remote cache. | ||
|| doc.getVersion().compareTo(existingDoc.getVersion()) > 0 | ||
|| (doc.getVersion().compareTo(existingDoc.getVersion()) == 0 | ||
&& existingDoc.hasPendingWrites())) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, why do we overwrite the existing doc when it has pending writes? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the only important question, the rest are nitpicks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the write stream sends us an acknowledgment, we persist a document with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the explanation! |
||
remoteDocuments.add(doc); | ||
changedDocs.put(key, doc); | ||
} else if (doc instanceof NoDocument && doc.getVersion().equals(SnapshotVersion.NONE)) { | ||
|
@@ -422,7 +408,6 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEve | |
// remote events when we get permission denied errors while trying to resolve the | ||
// state of a locally cached document that is in limbo. | ||
SnapshotVersion lastRemoteVersion = queryCache.getLastRemoteSnapshotVersion(); | ||
SnapshotVersion remoteVersion = remoteEvent.getSnapshotVersion(); | ||
if (!remoteVersion.equals(SnapshotVersion.NONE)) { | ||
hardAssert( | ||
remoteVersion.compareTo(lastRemoteVersion) >= 0, | ||
|
@@ -448,10 +433,11 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEve | |
*/ | ||
private static boolean shouldPersistQueryData( | ||
QueryData oldQueryData, QueryData newQueryData, TargetChange change) { | ||
// Avoid clearing any existing value | ||
if (newQueryData.getResumeToken().isEmpty()) return false; | ||
hardAssert( | ||
!newQueryData.getResumeToken().isEmpty(), | ||
"Attempted to persist query data with empty resume token"); | ||
|
||
// Any resume token is interesting if there isn't one already. | ||
// Always persist query data if we don't already have a resume token. | ||
if (oldQueryData.getResumeToken().isEmpty()) return true; | ||
|
||
// Don't allow resume token changes to be buffered indefinitely. This allows us to be reasonably | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this comment no longer apply?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I rewrote it below since I didn't quite understand it at first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, cool. I also found the original comment confusing.