-
Notifications
You must be signed in to change notification settings - Fork 626
Add mutation batch id to index backfill #3202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
6d64d18
c27c051
063c88b
b7fa22d
7202745
c0457b6
add5707
7e7164d
aff17e3
0b9b735
438592c
a82bfbf
4c830a4
c47695f
9ea9b27
2ec7f09
1af2624
273656b
1345cf6
4b10ea0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
|
||
package com.google.firebase.firestore.local; | ||
|
||
import android.util.Pair; | ||
import androidx.annotation.Nullable; | ||
import com.google.firebase.firestore.model.DocumentKey; | ||
import com.google.firebase.firestore.model.ResourcePath; | ||
|
@@ -54,4 +55,14 @@ public interface DocumentOverlayCache { | |
* change past `sinceBatchId` are returned. | ||
*/ | ||
Map<DocumentKey, Mutation> getOverlays(ResourcePath collection, int sinceBatchId); | ||
|
||
/** | ||
* Returns a mapping of overlays containing the largest batch id for each overlay mutation. | ||
* | ||
* @param collection The collection path to get the overlays for. | ||
* @param sinceBatchId The minimum batch ID to filter by (exclusive). Only overlays that contain a | ||
* change past `sinceBatchId` are returned. | ||
*/ | ||
Map<DocumentKey, Pair<Integer, Mutation>> getOverlaysWithBatchId( | ||
ResourcePath collection, int sinceBatchId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am surprised we don't have a high-level class that contains both the mutation and the batch ID for Overlays. We should probably create one and combine this method with the existing method just above. (cc @wu-hui) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Renamed this method into the original |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -137,23 +137,56 @@ private int writeIndexEntries(LocalDocumentsView localDocumentsView) { | |
|
||
/** Writes entries for the fetched field indexes. */ | ||
private int writeEntriesForCollectionGroup( | ||
LocalDocumentsView localDocumentsView, String collectionGroup, int entriesRemainingUnderCap) { | ||
LocalDocumentsView localDocumentsView, | ||
String collectionGroup, | ||
int documentsRemainingUnderCap) { | ||
int documentsProcessed = 0; | ||
Query query = new Query(ResourcePath.EMPTY, collectionGroup); | ||
|
||
// Use the earliest offset of all field indexes to query the local cache. | ||
IndexOffset existingOffset = getExistingOffset(indexManager.getFieldIndexes(collectionGroup)); | ||
Collection<FieldIndex> fieldIndexes = indexManager.getFieldIndexes(collectionGroup); | ||
IndexOffset existingOffset = getExistingOffset(fieldIndexes); | ||
|
||
// TODO(indexing): Use limit queries to only fetch the required number of entries. | ||
// TODO(indexing): Support mutation batch Ids when sorting and writing indexes. | ||
ImmutableSortedMap<DocumentKey, Document> documents = | ||
localDocumentsView.getDocumentsMatchingQuery(query, existingOffset); | ||
|
||
List<Document> oldestDocuments = getOldestDocuments(documents, entriesRemainingUnderCap); | ||
List<Document> oldestDocuments = getOldestDocuments(documents, documentsRemainingUnderCap); | ||
indexManager.updateIndexEntries(oldestDocuments); | ||
documentsProcessed += oldestDocuments.size(); | ||
|
||
IndexOffset newOffset = getNewOffset(oldestDocuments, existingOffset); | ||
|
||
// Start indexing mutations only after catching up for read time. This must be done separately | ||
// since the backfill's first pass is in read time order rather than mutation batch id order. | ||
int documentsRemaining = documentsRemainingUnderCap - oldestDocuments.size(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you move this to a helper method for better encapsulation? We might also want to do the same for the RemoteDocumentCache code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Moved into LocalDocumentsView |
||
if (documentsRemaining > 0) { | ||
int earliestBatchId = getEarliestBatchId(fieldIndexes); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was hoping we would have one existing offset for both the scan of the RDC and the mutations - but I now realize that is not the case. We could change getExistingOffset() to return an offset with the lowest SnapshotVersion+Key as well as the lowest batch ID (which might be from a different index). What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, moved into LocalDocumentsView. |
||
newOffset = | ||
IndexOffset.create(newOffset.getReadTime(), newOffset.getDocumentKey(), earliestBatchId); | ||
Map<Document, Integer> documentToBatchId = | ||
localDocumentsView.getDocumentsBatchIdsMatchingCollectionGroupQuery(query, newOffset); | ||
|
||
// Write index entries for documents touched by local mutations and update the offset to the | ||
// new largest batch id. | ||
if (!documentToBatchId.isEmpty()) { | ||
List<Document> documentsByBatchId = | ||
getDocumentsByBatchId(documentToBatchId, documentsRemaining); | ||
|
||
// Largest batch id is the last element in documentsByBatchId since it's sorted in ascending | ||
// order by batch id. | ||
int newLargestBatchId = | ||
documentToBatchId.get(documentsByBatchId.get(documentsByBatchId.size() - 1)); | ||
newOffset = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you move this into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. N/A after moving into LocalDocumentsView. |
||
IndexOffset.create( | ||
newOffset.getReadTime(), newOffset.getDocumentKey(), newLargestBatchId); | ||
indexManager.updateIndexEntries(documentsByBatchId); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If I follow correctly, we are now calling this twice for a document if a document is returned from both RDC and the mutation queue. My hunch is that we may even need to move all mutation ID processing to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a set in LDV to track document keys and a test to check we only index a document once, even if it appears in mutations and RDC. |
||
documentsProcessed += documentsByBatchId.size(); | ||
} | ||
} | ||
|
||
indexManager.updateCollectionGroup(collectionGroup, newOffset); | ||
return oldestDocuments.size(); | ||
return documentsProcessed; | ||
} | ||
|
||
/** Returns the lowest offset for the provided index group. */ | ||
|
@@ -169,7 +202,8 @@ private IndexOffset getExistingOffset(Collection<FieldIndex> fieldIndexes) { | |
} | ||
|
||
/** | ||
* Returns the offset for the index based on the newly indexed documents. | ||
* Returns the read time and document key offset for the index based on the newly indexed | ||
* documents. | ||
* | ||
* @param documents a list of documents sorted by read time and key (ascending) | ||
* @param currentOffset the current offset of the index group | ||
|
@@ -201,6 +235,40 @@ private List<Document> getOldestDocuments( | |
return oldestDocuments.subList(0, Math.min(count, oldestDocuments.size())); | ||
} | ||
|
||
/** | ||
* Returns up to {@code count} documents sorted by batch id ascending, except in cases where the | ||
* mutation batch id contains more documents. This method will always return all documents in a | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It sounds like all bets are off when the batch ID contains more documents - maybe rewrite the intro to make it more clear that this only applies to the count. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed method after refactoring into LDV. |
||
* mutation batch, even if it goes over the limit. | ||
* | ||
* <p>For example, if count = 2, and we have: (docA, batch1), (docB, batch2), and (docC, batch2), | ||
* all three documents will be returned since docC is also part of batch2. | ||
*/ | ||
private List<Document> getDocumentsByBatchId( | ||
Map<Document, Integer> documentToBatchId, int count) { | ||
List<Document> oldestDocuments = new ArrayList<>(documentToBatchId.keySet()); | ||
Collections.sort( | ||
oldestDocuments, (l, r) -> documentToBatchId.get(l).compareTo(documentToBatchId.get(r))); | ||
int i = Math.min(count, oldestDocuments.size()) - 1; | ||
|
||
// Include all documents that match the last document's batch id. | ||
int lastBatchId = documentToBatchId.get(oldestDocuments.get(i)); | ||
while (i < oldestDocuments.size() | ||
&& lastBatchId == documentToBatchId.get(oldestDocuments.get(i))) { | ||
++i; | ||
} | ||
return oldestDocuments.subList(0, i); | ||
} | ||
|
||
/** Returns the earliest batch id from the specified field indexes. */ | ||
private int getEarliestBatchId(Collection<FieldIndex> fieldIndexes) { | ||
int lowestBatchId = (int) Double.POSITIVE_INFINITY; | ||
thebrianchen marked this conversation as resolved.
Show resolved
Hide resolved
|
||
for (FieldIndex fieldIndex : fieldIndexes) { | ||
lowestBatchId = | ||
Math.min(fieldIndex.getIndexState().getOffset().getLargestBatchId(), lowestBatchId); | ||
} | ||
return lowestBatchId; | ||
} | ||
|
||
@VisibleForTesting | ||
void setMaxDocumentsToProcess(int newMax) { | ||
maxDocumentsToProcess = newMax; | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
import static com.google.firebase.firestore.model.DocumentCollections.emptyDocumentMap; | ||
import static com.google.firebase.firestore.util.Assert.hardAssert; | ||
|
||
import android.util.Pair; | ||
import androidx.annotation.VisibleForTesting; | ||
import com.google.firebase.Timestamp; | ||
import com.google.firebase.database.collection.ImmutableSortedMap; | ||
|
@@ -316,7 +317,31 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection | |
} | ||
} | ||
|
||
/** Queries the remote documents and overlays by doing a full collection scan. */ | ||
/** | ||
* Returns a mapping of documents that match the provided query and offset mapped to the | ||
* corresponding batch id. | ||
*/ | ||
// TODO(indexing): Support adding local mutations to collection group table. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually looked this up the other day but I think we are doing this already: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OH I see now that the ignored test is failing b/c the mutations are only being added to the overlay cache not done through the normal flow. If that's the case, can we delete the ignored test case? (other option is to add the mutation into the mutation queue as well). |
||
public Map<Document, Integer> getDocumentsBatchIdsMatchingCollectionGroupQuery( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your code would be cleaner if this method returned The name is also quite a mouthful - and the method should probably just accept a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, but cannot think of a cleaner name. |
||
Query query, IndexOffset offset) { | ||
hardAssert( | ||
query.getPath().isEmpty(), | ||
"Currently we only support collection group queries at the root."); | ||
String collectionId = query.getCollectionGroup(); | ||
Map<Document, Integer> results = new HashMap<>(); | ||
List<ResourcePath> parents = indexManager.getCollectionParents(collectionId); | ||
|
||
// Perform a collection query against each parent that contains the collectionId and | ||
// aggregate the results. | ||
for (ResourcePath parent : parents) { | ||
Query collectionQuery = query.asCollectionQueryAtPath(parent.append(collectionId)); | ||
Map<Document, Integer> documentToBatchIds = | ||
getDocumentsMatchingCollectionQueryWithBatchId(collectionQuery, offset); | ||
results.putAll(documentToBatchIds); | ||
} | ||
return results; | ||
} | ||
|
||
private ImmutableSortedMap<DocumentKey, Document> | ||
getDocumentsMatchingCollectionQueryFromOverlayCache(Query query, IndexOffset offset) { | ||
ImmutableSortedMap<DocumentKey, MutableDocument> remoteDocuments = | ||
|
@@ -325,27 +350,65 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection | |
|
||
// As documents might match the query because of their overlay we need to include all documents | ||
// in the result. | ||
remoteDocuments = updateRemoteDocumentsWithOverlayDocuments(remoteDocuments, overlays.keySet()); | ||
|
||
// Apply the overlays and match against the query. | ||
ImmutableSortedMap<DocumentKey, Document> results = emptyDocumentMap(); | ||
for (Map.Entry<DocumentKey, MutableDocument> docEntry : remoteDocuments) { | ||
Mutation overlay = overlays.get(docEntry.getKey()); | ||
if (overlay != null) { | ||
overlay.applyToLocalView(docEntry.getValue(), null, Timestamp.now()); | ||
} | ||
// Finally, insert the documents that still match the query | ||
if (query.matches(docEntry.getValue())) { | ||
results = results.insert(docEntry.getKey(), docEntry.getValue()); | ||
} | ||
} | ||
|
||
return results; | ||
} | ||
|
||
/** Updates the provided remote documents to include documents in the overlay. */ | ||
private ImmutableSortedMap<DocumentKey, MutableDocument> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I tried refactoring out shared code between There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are probably at a point where we should just remove the non-overlay code. |
||
updateRemoteDocumentsWithOverlayDocuments( | ||
ImmutableSortedMap<DocumentKey, MutableDocument> remoteDocuments, | ||
Set<DocumentKey> overlayDocumentKeys) { | ||
Set<DocumentKey> missingDocuments = new HashSet<>(); | ||
for (Map.Entry<DocumentKey, Mutation> entry : overlays.entrySet()) { | ||
if (!remoteDocuments.containsKey(entry.getKey())) { | ||
missingDocuments.add(entry.getKey()); | ||
for (DocumentKey key : overlayDocumentKeys) { | ||
if (!remoteDocuments.containsKey(key)) { | ||
missingDocuments.add(key); | ||
} | ||
} | ||
for (Map.Entry<DocumentKey, MutableDocument> entry : | ||
remoteDocumentCache.getAll(missingDocuments).entrySet()) { | ||
remoteDocuments = remoteDocuments.insert(entry.getKey(), entry.getValue()); | ||
} | ||
return remoteDocuments; | ||
} | ||
|
||
/** Queries the remote documents and overlays by doing a full collection scan. */ | ||
public Map<Document, Integer> getDocumentsMatchingCollectionQueryWithBatchId( | ||
Query query, IndexOffset offset) { | ||
ImmutableSortedMap<DocumentKey, MutableDocument> remoteDocuments = | ||
remoteDocumentCache.getAllDocumentsMatchingQuery(query, offset); | ||
Map<DocumentKey, Pair<Integer, Mutation>> overlays = | ||
documentOverlayCache.getOverlaysWithBatchId(query.getPath(), offset.getLargestBatchId()); | ||
|
||
// As documents might match the query because of their overlay we need to include all documents | ||
// in the result. | ||
remoteDocuments = updateRemoteDocumentsWithOverlayDocuments(remoteDocuments, overlays.keySet()); | ||
|
||
// Apply the overlays and match against the query. | ||
ImmutableSortedMap<DocumentKey, Document> results = emptyDocumentMap(); | ||
Map<Document, Integer> results = new HashMap<>(); | ||
for (Map.Entry<DocumentKey, MutableDocument> docEntry : remoteDocuments) { | ||
Mutation overlay = overlays.get(docEntry.getKey()); | ||
int batchId = overlays.get(docEntry.getKey()).first; | ||
Mutation overlay = overlays.get(docEntry.getKey()).second; | ||
if (overlay != null) { | ||
overlay.applyToLocalView(docEntry.getValue(), null, Timestamp.now()); | ||
} | ||
// Finally, insert the documents that still match the query | ||
if (query.matches(docEntry.getValue())) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should do this once at the end of FWIW, we might not need to do this at all if we pass a collection group here instead of the query. None of the IndexBackfill methods need a query. |
||
results = results.insert(docEntry.getKey(), docEntry.getValue()); | ||
results.put(docEntry.getValue(), batchId); | ||
} | ||
} | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -105,4 +105,32 @@ public Map<DocumentKey, Mutation> getOverlays(ResourcePath collection, int since | |
|
||
return result; | ||
} | ||
|
||
@Override | ||
public Map<DocumentKey, Pair<Integer, Mutation>> getOverlaysWithBatchId( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please find a way to remove this duplication - even if we do not add an Overlay class. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Combined the two methods into a single one. |
||
ResourcePath collection, int sinceBatchId) { | ||
Map<DocumentKey, Pair<Integer, Mutation>> result = new HashMap<>(); | ||
|
||
int immediateChildrenPathLength = collection.length() + 1; | ||
DocumentKey prefix = DocumentKey.fromPath(collection.append("")); | ||
Map<DocumentKey, Pair<Integer, Mutation>> view = overlays.tailMap(prefix); | ||
|
||
for (Map.Entry<DocumentKey, Pair<Integer, Mutation>> entry : view.entrySet()) { | ||
DocumentKey key = entry.getKey(); | ||
if (!collection.isPrefixOf(key.getPath())) { | ||
break; | ||
} | ||
// Documents from sub-collections | ||
if (key.getPath().length() != immediateChildrenPathLength) { | ||
continue; | ||
} | ||
|
||
Pair<Integer, Mutation> batchIdToOverlay = entry.getValue(); | ||
if (batchIdToOverlay.first > sinceBatchId) { | ||
result.put(entry.getKey(), batchIdToOverlay); | ||
} | ||
} | ||
|
||
return result; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,6 +16,7 @@ | |
|
||
import static com.google.firebase.firestore.util.Assert.fail; | ||
|
||
import android.util.Pair; | ||
import androidx.annotation.Nullable; | ||
import com.google.firebase.firestore.auth.User; | ||
import com.google.firebase.firestore.model.DocumentKey; | ||
|
@@ -111,4 +112,34 @@ public Map<DocumentKey, Mutation> getOverlays(ResourcePath collection, int since | |
|
||
return result; | ||
} | ||
|
||
@Override | ||
public Map<DocumentKey, Pair<Integer, Mutation>> getOverlaysWithBatchId( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same comment as above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
ResourcePath collection, int sinceBatchId) { | ||
String collectionPath = EncodedPath.encode(collection); | ||
|
||
Map<DocumentKey, Pair<Integer, Mutation>> result = new HashMap<>(); | ||
|
||
db.query( | ||
"SELECT document_id, overlay_mutation, largest_batch_id FROM document_overlays " | ||
+ "WHERE uid = ? AND collection_path = ? AND largest_batch_id > ?") | ||
.binding(uid, collectionPath, sinceBatchId) | ||
.forEach( | ||
row -> { | ||
try { | ||
String documentId = row.getString(0); | ||
Write write = Write.parseFrom(row.getBlob(1)); | ||
int largestBatchId = row.getInt(2); | ||
Mutation mutation = serializer.decodeMutation(write); | ||
|
||
result.put( | ||
DocumentKey.fromPath(collection.append(documentId)), | ||
new Pair(largestBatchId, mutation)); | ||
} catch (InvalidProtocolBufferException e) { | ||
throw fail("Overlay failed to parse: %s", e); | ||
} | ||
}); | ||
|
||
return result; | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.