Skip to content

Add mutation batch id to index backfill #3202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.auto.value.AutoValue;
import com.google.firebase.firestore.model.DocumentKey;
import com.google.firebase.firestore.util.Util;
import java.util.Comparator;

/** Represents an index entry saved by the SDK in its local storage. */
@AutoValue
Expand Down Expand Up @@ -53,4 +54,9 @@ public int compareTo(IndexEntry other) {

return nullSafeCompare(getArrayValue(), other.getArrayValue(), Util::compareByteArrays);
}

public static final Comparator<IndexEntry> SEMANTIC_COMPARATOR =
(left, right) -> {
return left.compareTo(right);
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.google.firebase.firestore.local;

import android.util.Pair;
import androidx.annotation.Nullable;
import com.google.firebase.firestore.model.DocumentKey;
import com.google.firebase.firestore.model.ResourcePath;
Expand Down Expand Up @@ -54,4 +55,14 @@ public interface DocumentOverlayCache {
* change past `sinceBatchId` are returned.
*/
Map<DocumentKey, Mutation> getOverlays(ResourcePath collection, int sinceBatchId);

/**
* Returns a mapping of overlays containing the largest batch id for each overlay mutation.
*
* @param collection The collection path to get the overlays for.
* @param sinceBatchId The minimum batch ID to filter by (exclusive). Only overlays that contain a
* change past `sinceBatchId` are returned.
*/
Map<DocumentKey, Pair<Integer, Mutation>> getOverlaysWithBatchId(
ResourcePath collection, int sinceBatchId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am surprised we don't have a high-level class that contains both the mutation and the batch ID for Overlays. We should probably create one and combine this method with the existing method just above. (cc @wu-hui)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed this method into the original getOverlays() and changed logic to support Pair.

}
Original file line number Diff line number Diff line change
Expand Up @@ -137,23 +137,56 @@ private int writeIndexEntries(LocalDocumentsView localDocumentsView) {

/** Writes entries for the fetched field indexes. */
private int writeEntriesForCollectionGroup(
LocalDocumentsView localDocumentsView, String collectionGroup, int entriesRemainingUnderCap) {
LocalDocumentsView localDocumentsView,
String collectionGroup,
int documentsRemainingUnderCap) {
int documentsProcessed = 0;
Query query = new Query(ResourcePath.EMPTY, collectionGroup);

// Use the earliest offset of all field indexes to query the local cache.
IndexOffset existingOffset = getExistingOffset(indexManager.getFieldIndexes(collectionGroup));
Collection<FieldIndex> fieldIndexes = indexManager.getFieldIndexes(collectionGroup);
IndexOffset existingOffset = getExistingOffset(fieldIndexes);

// TODO(indexing): Use limit queries to only fetch the required number of entries.
// TODO(indexing): Support mutation batch Ids when sorting and writing indexes.
ImmutableSortedMap<DocumentKey, Document> documents =
localDocumentsView.getDocumentsMatchingQuery(query, existingOffset);

List<Document> oldestDocuments = getOldestDocuments(documents, entriesRemainingUnderCap);
List<Document> oldestDocuments = getOldestDocuments(documents, documentsRemainingUnderCap);
indexManager.updateIndexEntries(oldestDocuments);
documentsProcessed += oldestDocuments.size();

IndexOffset newOffset = getNewOffset(oldestDocuments, existingOffset);

// Start indexing mutations only after catching up for read time. This must be done separately
// since the backfill's first pass is in read time order rather than mutation batch id order.
int documentsRemaining = documentsRemainingUnderCap - oldestDocuments.size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this to a helper method for better encapsulation? We might also want to do the same for the RemoteDocumentCache code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved into LocalDocumentsView

if (documentsRemaining > 0) {
int earliestBatchId = getEarliestBatchId(fieldIndexes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was hoping we would have one existing offset for both the scan of the RDC and the mutations - but I now realize that is not the case. We could change getExistingOffset() to return an offset with the lowest SnapshotVersion+Key as well as the lowest batch ID (which might be from a different index). What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, moved into LocalDocumentsView.

newOffset =
IndexOffset.create(newOffset.getReadTime(), newOffset.getDocumentKey(), earliestBatchId);
Map<Document, Integer> documentToBatchId =
localDocumentsView.getDocumentsBatchIdsMatchingCollectionGroupQuery(query, newOffset);

// Write index entries for documents touched by local mutations and update the offset to the
// new largest batch id.
if (!documentToBatchId.isEmpty()) {
List<Document> documentsByBatchId =
getDocumentsByBatchId(documentToBatchId, documentsRemaining);

// Largest batch id is the last element in documentsByBatchId since it's sorted in ascending
// order by batch id.
int newLargestBatchId =
documentToBatchId.get(documentsByBatchId.get(documentsByBatchId.size() - 1));
newOffset =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this into getNewOffset? (as a final call at the end of the method)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

N/A after moving into LocalDocumentsView.

IndexOffset.create(
newOffset.getReadTime(), newOffset.getDocumentKey(), newLargestBatchId);
indexManager.updateIndexEntries(documentsByBatchId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I follow correctly, we are now calling this twice for a document if a document is returned from both RDC and the mutation queue. My hunch is that we may even need to move all mutation ID processing to getDocumentsMatchingQuery in LocalDocumentsView to not scan the collections twice.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a set in LDV to track document keys and a test to check we only index a document once, even if it appears in mutations and RDC.

documentsProcessed += documentsByBatchId.size();
}
}

indexManager.updateCollectionGroup(collectionGroup, newOffset);
return oldestDocuments.size();
return documentsProcessed;
}

/** Returns the lowest offset for the provided index group. */
Expand All @@ -169,7 +202,8 @@ private IndexOffset getExistingOffset(Collection<FieldIndex> fieldIndexes) {
}

/**
* Returns the offset for the index based on the newly indexed documents.
* Returns the read time and document key offset for the index based on the newly indexed
* documents.
*
* @param documents a list of documents sorted by read time and key (ascending)
* @param currentOffset the current offset of the index group
Expand Down Expand Up @@ -201,6 +235,40 @@ private List<Document> getOldestDocuments(
return oldestDocuments.subList(0, Math.min(count, oldestDocuments.size()));
}

/**
* Returns up to {@code count} documents sorted by batch id ascending, except in cases where the
* mutation batch id contains more documents. This method will always return all documents in a
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It sounds like all bets are off when the batch ID contains more documents - maybe rewrite the intro to make it more clear that this only applies to the count.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed method after refactoring into LDV.

* mutation batch, even if it goes over the limit.
*
* <p>For example, if count = 2, and we have: (docA, batch1), (docB, batch2), and (docC, batch2),
* all three documents will be returned since docC is also part of batch2.
*/
private List<Document> getDocumentsByBatchId(
Map<Document, Integer> documentToBatchId, int count) {
List<Document> oldestDocuments = new ArrayList<>(documentToBatchId.keySet());
Collections.sort(
oldestDocuments, (l, r) -> documentToBatchId.get(l).compareTo(documentToBatchId.get(r)));
int i = Math.min(count, oldestDocuments.size()) - 1;

// Include all documents that match the last document's batch id.
int lastBatchId = documentToBatchId.get(oldestDocuments.get(i));
while (i < oldestDocuments.size()
&& lastBatchId == documentToBatchId.get(oldestDocuments.get(i))) {
++i;
}
return oldestDocuments.subList(0, i);
}

/** Returns the earliest batch id from the specified field indexes. */
private int getEarliestBatchId(Collection<FieldIndex> fieldIndexes) {
int lowestBatchId = (int) Double.POSITIVE_INFINITY;
for (FieldIndex fieldIndex : fieldIndexes) {
lowestBatchId =
Math.min(fieldIndex.getIndexState().getOffset().getLargestBatchId(), lowestBatchId);
}
return lowestBatchId;
}

@VisibleForTesting
void setMaxDocumentsToProcess(int newMax) {
maxDocumentsToProcess = newMax;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static com.google.firebase.firestore.model.DocumentCollections.emptyDocumentMap;
import static com.google.firebase.firestore.util.Assert.hardAssert;

import android.util.Pair;
import androidx.annotation.VisibleForTesting;
import com.google.firebase.Timestamp;
import com.google.firebase.database.collection.ImmutableSortedMap;
Expand Down Expand Up @@ -316,7 +317,31 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection
}
}

/** Queries the remote documents and overlays by doing a full collection scan. */
/**
* Returns a mapping of documents that match the provided query and offset mapped to the
* corresponding batch id.
*/
// TODO(indexing): Support adding local mutations to collection group table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OH I see now that the ignored test is failing b/c the mutations are only being added to the overlay cache not done through the normal flow. If that's the case, can we delete the ignored test case? (other option is to add the mutation into the mutation queue as well).

public Map<Document, Integer> getDocumentsBatchIdsMatchingCollectionGroupQuery(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your code would be cleaner if this method returned Map<BatchId, List<Document>>.

The name is also quite a mouthful - and the method should probably just accept a String collectionGroup rather than a query as it ignores all other query constraints.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, but cannot think of a cleaner name.

Query query, IndexOffset offset) {
hardAssert(
query.getPath().isEmpty(),
"Currently we only support collection group queries at the root.");
String collectionId = query.getCollectionGroup();
Map<Document, Integer> results = new HashMap<>();
List<ResourcePath> parents = indexManager.getCollectionParents(collectionId);

// Perform a collection query against each parent that contains the collectionId and
// aggregate the results.
for (ResourcePath parent : parents) {
Query collectionQuery = query.asCollectionQueryAtPath(parent.append(collectionId));
Map<Document, Integer> documentToBatchIds =
getDocumentsMatchingCollectionQueryWithBatchId(collectionQuery, offset);
results.putAll(documentToBatchIds);
}
return results;
}

private ImmutableSortedMap<DocumentKey, Document>
getDocumentsMatchingCollectionQueryFromOverlayCache(Query query, IndexOffset offset) {
ImmutableSortedMap<DocumentKey, MutableDocument> remoteDocuments =
Expand All @@ -325,27 +350,65 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection

// As documents might match the query because of their overlay we need to include all documents
// in the result.
remoteDocuments = updateRemoteDocumentsWithOverlayDocuments(remoteDocuments, overlays.keySet());

// Apply the overlays and match against the query.
ImmutableSortedMap<DocumentKey, Document> results = emptyDocumentMap();
for (Map.Entry<DocumentKey, MutableDocument> docEntry : remoteDocuments) {
Mutation overlay = overlays.get(docEntry.getKey());
if (overlay != null) {
overlay.applyToLocalView(docEntry.getValue(), null, Timestamp.now());
}
// Finally, insert the documents that still match the query
if (query.matches(docEntry.getValue())) {
results = results.insert(docEntry.getKey(), docEntry.getValue());
}
}

return results;
}

/** Updates the provided remote documents to include documents in the overlay. */
private ImmutableSortedMap<DocumentKey, MutableDocument>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried refactoring out shared code between getDocumentsMatchingCollectionQueryFromOverlayCache and getDocumentsCollectionQueryWithBatchId, but I think it can probably be cleaner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are probably at a point where we should just remove the non-overlay code.

@wu-hui

updateRemoteDocumentsWithOverlayDocuments(
ImmutableSortedMap<DocumentKey, MutableDocument> remoteDocuments,
Set<DocumentKey> overlayDocumentKeys) {
Set<DocumentKey> missingDocuments = new HashSet<>();
for (Map.Entry<DocumentKey, Mutation> entry : overlays.entrySet()) {
if (!remoteDocuments.containsKey(entry.getKey())) {
missingDocuments.add(entry.getKey());
for (DocumentKey key : overlayDocumentKeys) {
if (!remoteDocuments.containsKey(key)) {
missingDocuments.add(key);
}
}
for (Map.Entry<DocumentKey, MutableDocument> entry :
remoteDocumentCache.getAll(missingDocuments).entrySet()) {
remoteDocuments = remoteDocuments.insert(entry.getKey(), entry.getValue());
}
return remoteDocuments;
}

/** Queries the remote documents and overlays by doing a full collection scan. */
public Map<Document, Integer> getDocumentsMatchingCollectionQueryWithBatchId(
Query query, IndexOffset offset) {
ImmutableSortedMap<DocumentKey, MutableDocument> remoteDocuments =
remoteDocumentCache.getAllDocumentsMatchingQuery(query, offset);
Map<DocumentKey, Pair<Integer, Mutation>> overlays =
documentOverlayCache.getOverlaysWithBatchId(query.getPath(), offset.getLargestBatchId());

// As documents might match the query because of their overlay we need to include all documents
// in the result.
remoteDocuments = updateRemoteDocumentsWithOverlayDocuments(remoteDocuments, overlays.keySet());

// Apply the overlays and match against the query.
ImmutableSortedMap<DocumentKey, Document> results = emptyDocumentMap();
Map<Document, Integer> results = new HashMap<>();
for (Map.Entry<DocumentKey, MutableDocument> docEntry : remoteDocuments) {
Mutation overlay = overlays.get(docEntry.getKey());
int batchId = overlays.get(docEntry.getKey()).first;
Mutation overlay = overlays.get(docEntry.getKey()).second;
if (overlay != null) {
overlay.applyToLocalView(docEntry.getValue(), null, Timestamp.now());
}
// Finally, insert the documents that still match the query
if (query.matches(docEntry.getValue())) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do this once at the end of getNextDocumentsAndOffset.

FWIW, we might not need to do this at all if we pass a collection group here instead of the query. None of the IndexBackfill methods need a query.

results = results.insert(docEntry.getKey(), docEntry.getValue());
results.put(docEntry.getValue(), batchId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,4 +105,32 @@ public Map<DocumentKey, Mutation> getOverlays(ResourcePath collection, int since

return result;
}

@Override
public Map<DocumentKey, Pair<Integer, Mutation>> getOverlaysWithBatchId(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please find a way to remove this duplication - even if we do not add an Overlay class.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Combined the two methods into a single one.

ResourcePath collection, int sinceBatchId) {
Map<DocumentKey, Pair<Integer, Mutation>> result = new HashMap<>();

int immediateChildrenPathLength = collection.length() + 1;
DocumentKey prefix = DocumentKey.fromPath(collection.append(""));
Map<DocumentKey, Pair<Integer, Mutation>> view = overlays.tailMap(prefix);

for (Map.Entry<DocumentKey, Pair<Integer, Mutation>> entry : view.entrySet()) {
DocumentKey key = entry.getKey();
if (!collection.isPrefixOf(key.getPath())) {
break;
}
// Documents from sub-collections
if (key.getPath().length() != immediateChildrenPathLength) {
continue;
}

Pair<Integer, Mutation> batchIdToOverlay = entry.getValue();
if (batchIdToOverlay.first > sinceBatchId) {
result.put(entry.getKey(), batchIdToOverlay);
}
}

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static com.google.firebase.firestore.util.Assert.fail;

import android.util.Pair;
import androidx.annotation.Nullable;
import com.google.firebase.firestore.auth.User;
import com.google.firebase.firestore.model.DocumentKey;
Expand Down Expand Up @@ -111,4 +112,34 @@ public Map<DocumentKey, Mutation> getOverlays(ResourcePath collection, int since

return result;
}

@Override
public Map<DocumentKey, Pair<Integer, Mutation>> getOverlaysWithBatchId(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

ResourcePath collection, int sinceBatchId) {
String collectionPath = EncodedPath.encode(collection);

Map<DocumentKey, Pair<Integer, Mutation>> result = new HashMap<>();

db.query(
"SELECT document_id, overlay_mutation, largest_batch_id FROM document_overlays "
+ "WHERE uid = ? AND collection_path = ? AND largest_batch_id > ?")
.binding(uid, collectionPath, sinceBatchId)
.forEach(
row -> {
try {
String documentId = row.getString(0);
Write write = Write.parseFrom(row.getBlob(1));
int largestBatchId = row.getInt(2);
Mutation mutation = serializer.decodeMutation(write);

result.put(
DocumentKey.fromPath(collection.append(documentId)),
new Pair(largestBatchId, mutation));
} catch (InvalidProtocolBufferException e) {
throw fail("Overlay failed to parse: %s", e);
}
});

return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static java.lang.Math.max;

import androidx.annotation.Nullable;
import androidx.annotation.VisibleForTesting;
import com.google.firebase.Timestamp;
import com.google.firebase.firestore.auth.User;
import com.google.firebase.firestore.core.Bound;
Expand Down Expand Up @@ -105,7 +106,8 @@ public void start() {
// Fetch all index states if persisted for the user. These states contain per user information
// on how up to date the index is.
db.query(
"SELECT index_id, sequence_number, read_time_seconds, read_time_nanos, document_key "
"SELECT index_id, sequence_number, read_time_seconds, read_time_nanos, "
+ "document_key, largest_batch_id "
+ "FROM index_state WHERE uid = ?")
.binding(uid)
.forEach(
Expand All @@ -116,8 +118,10 @@ public void start() {
new SnapshotVersion(new Timestamp(row.getLong(2), row.getInt(3)));
DocumentKey documentKey =
DocumentKey.fromPath(EncodedPath.decodeResourcePath(row.getString(4)));
indexStates.put(
indexId, FieldIndex.IndexState.create(sequenceNumber, readTime, documentKey));
int largestBatchId = row.getInt(5);
FieldIndex.IndexOffset offset =
FieldIndex.IndexOffset.create(readTime, documentKey, largestBatchId);
indexStates.put(indexId, FieldIndex.IndexState.create(sequenceNumber, offset));
});

// Fetch all indices and combine with user's index state if available.
Expand Down Expand Up @@ -247,6 +251,7 @@ private void updateEntries(
diffCollections(
existingEntries,
newEntries,
IndexEntry.SEMANTIC_COMPARATOR,
entry -> addIndexEntry(document, entry),
entry -> deleteIndexEntry(document, entry));
}
Expand Down Expand Up @@ -343,8 +348,8 @@ private void deleteIndexEntry(Document document, IndexEntry indexEntry) {
document.getKey().toString());
}

private SortedSet<IndexEntry> getExistingIndexEntries(
DocumentKey documentKey, FieldIndex fieldIndex) {
@VisibleForTesting
SortedSet<IndexEntry> getExistingIndexEntries(DocumentKey documentKey, FieldIndex fieldIndex) {
SortedSet<IndexEntry> results = new TreeSet<>();
db.query(
"SELECT array_value, directional_value FROM index_entries "
Expand Down
Loading