Skip to content

Add mutation batch id to index backfill #3202

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.firebase.firestore.model.DocumentKey;
import com.google.firebase.firestore.model.ResourcePath;
import com.google.firebase.firestore.model.mutation.Mutation;
import com.google.firebase.firestore.model.mutation.Overlay;
import java.util.Map;

/**
Expand All @@ -35,7 +36,7 @@ public interface DocumentOverlayCache {
* for that key.
*/
@Nullable
Mutation getOverlay(DocumentKey key);
Overlay getOverlay(DocumentKey key);

/**
* Saves the given document key to mutation map to persistence as overlays. All overlays will have
Expand All @@ -52,6 +53,7 @@ public interface DocumentOverlayCache {
* @param collection The collection path to get the overlays for.
* @param sinceBatchId The minimum batch ID to filter by (exclusive). Only overlays that contain a
* change past `sinceBatchId` are returned.
* @return Mapping of each document key in the collection to its overlay.
*/
Map<DocumentKey, Mutation> getOverlays(ResourcePath collection, int sinceBatchId);
Map<DocumentKey, Overlay> getOverlays(ResourcePath collection, int sinceBatchId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import static com.google.firebase.firestore.util.Assert.hardAssert;

import android.util.Pair;
import androidx.annotation.Nullable;
import androidx.annotation.VisibleForTesting;
import com.google.firebase.database.collection.ImmutableSortedMap;
Expand All @@ -27,8 +28,6 @@
import com.google.firebase.firestore.util.Logger;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

Expand All @@ -45,15 +44,13 @@ public class IndexBackfiller {

private final Scheduler scheduler;
private final Persistence persistence;
private final RemoteDocumentCache remoteDocumentCache;
private LocalDocumentsView localDocumentsView;
private IndexManager indexManager;
private int maxDocumentsToProcess = MAX_DOCUMENTS_TO_PROCESS;

public IndexBackfiller(Persistence persistence, AsyncQueue asyncQueue) {
this.persistence = persistence;
this.scheduler = new Scheduler(asyncQueue);
this.remoteDocumentCache = persistence.getRemoteDocumentCache();
}

public void setLocalDocumentsView(LocalDocumentsView localDocumentsView) {
Expand Down Expand Up @@ -110,12 +107,11 @@ public Scheduler getScheduler() {
public int backfill() {
hardAssert(localDocumentsView != null, "setLocalDocumentsView() not called");
hardAssert(indexManager != null, "setIndexManager() not called");
return persistence.runTransaction(
"Backfill Indexes", () -> writeIndexEntries(localDocumentsView));
return persistence.runTransaction("Backfill Indexes", this::writeIndexEntries);
}

/** Writes index entries until the cap is reached. Returns the number of documents processed. */
private int writeIndexEntries(LocalDocumentsView localDocumentsView) {
private int writeIndexEntries() {
Set<String> processedCollectionGroups = new HashSet<>();
int documentsRemaining = maxDocumentsToProcess;
while (documentsRemaining > 0) {
Expand All @@ -124,58 +120,52 @@ private int writeIndexEntries(LocalDocumentsView localDocumentsView) {
break;
}
Logger.debug(LOG_TAG, "Processing collection: %s", collectionGroup);
documentsRemaining -=
writeEntriesForCollectionGroup(localDocumentsView, collectionGroup, documentsRemaining);
documentsRemaining -= writeEntriesForCollectionGroup(collectionGroup, documentsRemaining);
processedCollectionGroups.add(collectionGroup);
}
return maxDocumentsToProcess - documentsRemaining;
}

/** Writes entries for the fetched field indexes. */
/**
* Writes entries for the provided collection group. Returns the number of documents processed.
*/
private int writeEntriesForCollectionGroup(
LocalDocumentsView localDocumentsView, String collectionGroup, int entriesRemainingUnderCap) {
// TODO(indexing): Support mutation batch Ids when sorting and writing indexes.

String collectionGroup, int documentsRemainingUnderCap) {
// Use the earliest offset of all field indexes to query the local cache.
IndexOffset existingOffset = getExistingOffset(indexManager.getFieldIndexes(collectionGroup));
ImmutableSortedMap<DocumentKey, Document> documents =
localDocumentsView.getDocuments(collectionGroup, existingOffset, entriesRemainingUnderCap);
indexManager.updateIndexEntries(documents);
Collection<FieldIndex> fieldIndexes = indexManager.getFieldIndexes(collectionGroup);
IndexOffset existingOffset = getExistingOffset(fieldIndexes);

IndexOffset newOffset = getNewOffset(documents, existingOffset);
indexManager.updateCollectionGroup(collectionGroup, newOffset);
// Represents documents and the updated offset post-update.
Pair<IndexOffset, ImmutableSortedMap<DocumentKey, Document>> pair =
localDocumentsView.getNextDocumentsAndOffset(
collectionGroup, existingOffset, documentsRemainingUnderCap);
IndexOffset newOffset = pair.first;
ImmutableSortedMap<DocumentKey, Document> documentsToIndex = pair.second;

return documents.size();
indexManager.updateIndexEntries(documentsToIndex);
indexManager.updateCollectionGroup(collectionGroup, newOffset);
return documentsToIndex.size();
}

/** Returns the lowest offset for the provided index group. */
private IndexOffset getExistingOffset(Collection<FieldIndex> fieldIndexes) {
IndexOffset lowestOffset = null;
int lowestBatchId = Integer.MAX_VALUE;
for (FieldIndex fieldIndex : fieldIndexes) {
if (lowestOffset == null
|| fieldIndex.getIndexState().getOffset().compareTo(lowestOffset) < 0) {
lowestOffset = fieldIndex.getIndexState().getOffset();
}
lowestBatchId =
Math.min(fieldIndex.getIndexState().getOffset().getLargestBatchId(), lowestBatchId);
}
return lowestOffset == null ? IndexOffset.NONE : lowestOffset;
}
lowestOffset = lowestOffset == null ? IndexOffset.NONE : lowestOffset;

/** Returns the offset for the index based on the newly indexed documents. */
private IndexOffset getNewOffset(
ImmutableSortedMap<DocumentKey, Document> documents, IndexOffset currentOffset) {
if (documents.isEmpty()) {
return IndexOffset.create(remoteDocumentCache.getLatestReadTime());
} else {
IndexOffset latestOffset = currentOffset;
Iterator<Map.Entry<DocumentKey, Document>> it = documents.iterator();
while (it.hasNext()) {
IndexOffset newOffset = IndexOffset.fromDocument(it.next().getValue());
if (newOffset.compareTo(latestOffset) > 0) {
latestOffset = newOffset;
}
}
return latestOffset;
}
// Add earliest batch id to offset
lowestOffset =
IndexOffset.create(
lowestOffset.getReadTime(), lowestOffset.getDocumentKey(), lowestBatchId);
return lowestOffset;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import static com.google.firebase.firestore.model.DocumentCollections.emptyDocumentMap;
import static com.google.firebase.firestore.util.Assert.hardAssert;

import android.util.Pair;
import androidx.annotation.VisibleForTesting;
import com.google.firebase.Timestamp;
import com.google.firebase.database.collection.ImmutableSortedMap;
Expand All @@ -29,9 +30,12 @@
import com.google.firebase.firestore.model.mutation.FieldMask;
import com.google.firebase.firestore.model.mutation.Mutation;
import com.google.firebase.firestore.model.mutation.MutationBatch;
import com.google.firebase.firestore.model.mutation.Overlay;
import com.google.firebase.firestore.model.mutation.PatchMutation;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -88,14 +92,14 @@ IndexManager getIndexManager() {
* for it.
*/
Document getDocument(DocumentKey key) {
Mutation overlay = documentOverlayCache.getOverlay(key);
Overlay overlay = documentOverlayCache.getOverlay(key);
// Only read from remote document cache if overlay is a patch.
MutableDocument document =
(overlay == null || overlay instanceof PatchMutation)
(overlay == null || overlay.getMutation() instanceof PatchMutation)
? remoteDocumentCache.get(key)
: MutableDocument.newInvalidDocument(key);
if (overlay != null) {
overlay.applyToLocalView(document, null, Timestamp.now());
overlay.getMutation().applyToLocalView(document, null, Timestamp.now());
}

return document;
Expand Down Expand Up @@ -125,18 +129,18 @@ ImmutableSortedMap<DocumentKey, Document> getLocalViewOfDocuments(
ImmutableSortedMap<DocumentKey, Document> results = emptyDocumentMap();
Map<DocumentKey, MutableDocument> recalculateDocuments = new HashMap<>();
for (Map.Entry<DocumentKey, MutableDocument> entry : docs.entrySet()) {
Mutation overlay = documentOverlayCache.getOverlay(entry.getKey());
Overlay overlay = documentOverlayCache.getOverlay(entry.getKey());
// Recalculate an overlay if the document's existence state is changed due to a remote
// event *and* the overlay is a PatchMutation. This is because document existence state
// can change if some patch mutation's preconditions are met.
// NOTE: we recalculate when `overlay` is null as well, because there might be a patch
// mutation whose precondition does not match before the change (hence overlay==null),
// but would now match.
if (existenceStateChanged.contains(entry.getKey())
&& (overlay == null || overlay instanceof PatchMutation)) {
&& (overlay == null || overlay.getMutation() instanceof PatchMutation)) {
recalculateDocuments.put(entry.getKey(), docs.get(entry.getKey()));
} else if (overlay != null) {
overlay.applyToLocalView(entry.getValue(), null, Timestamp.now());
overlay.getMutation().applyToLocalView(entry.getValue(), null, Timestamp.now());
}
}

Expand Down Expand Up @@ -195,7 +199,10 @@ void recalculateAndSaveOverlays(Set<DocumentKey> documentKeys) {
recalculateAndSaveOverlays(docs);
}

/** Gets the local view of the next {@code count} documents based on their read time. */
/**
* Gets the local view of the next {@code count} documents based on their read time. The documents
* are ordered by read time and key.
*/
ImmutableSortedMap<DocumentKey, Document> getDocuments(
String collectionGroup, IndexOffset offset, int count) {
Map<DocumentKey, MutableDocument> docs =
Expand Down Expand Up @@ -255,16 +262,112 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection
return results;
}

/** Queries the remote documents and overlays by doing a full collection scan. */
/**
* Given a collection group, returns the next documents that follow the provided offset, along
* with an updated offset.
*
* <p>The documents returned by this method are ordered by remote version from the provided
* offset. If there are no more remote documents after the provided offset, documents with
* mutations in order of batch id from the offset are returned. Since all documents in a batch are
* returned together, the total number of documents returned can exceed {@code count}.
*
* <p>If no documents are found, returns an empty map and an offset with the latest read time in
* the remote document cache.
*
* @param collectionGroup The collection group for the documents.
* @param offset The offset to index into.
* @param count The number of documents to return
* @return A pair containing the next offset that corresponds to the next documents and a map of
* documents that follow the provided offset.
*/
Pair<IndexOffset, ImmutableSortedMap<DocumentKey, Document>> getNextDocumentsAndOffset(
String collectionGroup, IndexOffset offset, int count) {
// First backfill based on offset read time.
ImmutableSortedMap<DocumentKey, Document> returnedDocuments =
getDocuments(collectionGroup, offset, count);
IndexOffset newOffset = getNewOffset(returnedDocuments, offset);

// Backfill based on offset batch id if there is still count remaining.
// TODO: combine the read time and batch id fetches into single method
if (returnedDocuments.size() < count) {
int countRemaining = count - returnedDocuments.size();
Pair<IndexOffset, ImmutableSortedMap<DocumentKey, Document>> pair =
getDocumentsFromOverlay(collectionGroup, newOffset, returnedDocuments, countRemaining);
newOffset = pair.first;

for (Map.Entry<DocumentKey, Document> entry : pair.second) {
returnedDocuments = returnedDocuments.insert(entry.getKey(), entry.getValue());
}
}

return new Pair<>(newOffset, returnedDocuments);
}

/**
* Returns the next documents that follows the provided offset's largest batch id, along with an
* updated offset.
*
* @param collectionGroup The collectino group for the documents.
* @param offset The offset to index info.
* @param processedDocuments Already processed documents that should not be returned again by this
* method.
* @param count The number of documents to return.
* @return A pair containing the next offset that corresponds to the next documents and a map of
* documents that follow the provided offset's batch id.
*/
private Pair<IndexOffset, ImmutableSortedMap<DocumentKey, Document>> getDocumentsFromOverlay(
String collectionGroup,
IndexOffset offset,
ImmutableSortedMap<DocumentKey, Document> processedDocuments,
int count) {
SQLiteDocumentOverlayCache cache = (SQLiteDocumentOverlayCache) documentOverlayCache;
ImmutableSortedMap<DocumentKey, Document> returnedDocuments = emptyDocumentMap();
IndexOffset newOffset = offset;
int newLargestBatchId = offset.getLargestBatchId();
int documentCount = 0;

while (documentCount < count) {
Map<DocumentKey, Overlay> overlays =
cache.getNextOverlays(collectionGroup, newLargestBatchId);
if (overlays.isEmpty()) {
break;
}

// Prune documents that are already in processedDocuments.
List<DocumentKey> documentsToFetch = new ArrayList<>();
for (Map.Entry<DocumentKey, Overlay> entry : overlays.entrySet()) {
if (!processedDocuments.containsKey(entry.getKey())) {
documentsToFetch.add(entry.getKey());
documentCount++;
}
newLargestBatchId = entry.getValue().getLargestBatchId();
}
newOffset =
IndexOffset.create(
newOffset.getReadTime(), newOffset.getDocumentKey(), newLargestBatchId);

// Fetch the remote documents and apply the mutations before adding them to the results map.
Map<DocumentKey, MutableDocument> remoteDocuments =
remoteDocumentCache.getAll(documentsToFetch);
for (Map.Entry<DocumentKey, MutableDocument> entry : remoteDocuments.entrySet()) {
Overlay overlay = overlays.get(entry.getKey());
overlay.getMutation().applyToLocalView(entry.getValue(), null, Timestamp.now());

returnedDocuments = returnedDocuments.insert(entry.getKey(), entry.getValue());
}
}
return new Pair<>(newOffset, returnedDocuments);
}

private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollectionQuery(
Query query, IndexOffset offset) {
Map<DocumentKey, MutableDocument> remoteDocuments =
remoteDocumentCache.getAll(query.getPath(), offset);
Map<DocumentKey, Mutation> overlays = documentOverlayCache.getOverlays(query.getPath(), -1);
Map<DocumentKey, Overlay> overlays = documentOverlayCache.getOverlays(query.getPath(), -1);

// As documents might match the query because of their overlay we need to include documents
// for all overlays in the initial document set.
for (Map.Entry<DocumentKey, Mutation> entry : overlays.entrySet()) {
for (Map.Entry<DocumentKey, Overlay> entry : overlays.entrySet()) {
if (!remoteDocuments.containsKey(entry.getKey())) {
remoteDocuments.put(entry.getKey(), MutableDocument.newInvalidDocument(entry.getKey()));
}
Expand All @@ -273,16 +376,38 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection
// Apply the overlays and match against the query.
ImmutableSortedMap<DocumentKey, Document> results = emptyDocumentMap();
for (Map.Entry<DocumentKey, MutableDocument> docEntry : remoteDocuments.entrySet()) {
Mutation overlay = overlays.get(docEntry.getKey());
Overlay overlay = overlays.get(docEntry.getKey());
if (overlay != null) {
overlay.applyToLocalView(docEntry.getValue(), null, Timestamp.now());
overlay.getMutation().applyToLocalView(docEntry.getValue(), null, Timestamp.now());
}
// Finally, insert the documents that still match the query
if (query.matches(docEntry.getValue())) {
results = results.insert(docEntry.getKey(), docEntry.getValue());
}
}

return results;
}

/**
* Returns the offset for the index based on the newly indexed documents.
*
* <p>If there are no documents, returns an offset with the latest remote version.
*/
private IndexOffset getNewOffset(
ImmutableSortedMap<DocumentKey, Document> documents, IndexOffset currentOffset) {
if (documents.isEmpty()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW This should eventually live in the IndexBackfiller.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline, this probably does not and we need to leave it here.

return IndexOffset.create(
remoteDocumentCache.getLatestReadTime(), currentOffset.getLargestBatchId());
} else {
IndexOffset latestOffset = currentOffset;
Iterator<Map.Entry<DocumentKey, Document>> it = documents.iterator();
while (it.hasNext()) {
IndexOffset newOffset = IndexOffset.fromDocument(it.next().getValue());
if (newOffset.compareTo(latestOffset) > 0) {
latestOffset = newOffset;
}
}
return latestOffset;
}
}
}
Loading