Skip to content

Backfill Mutations #3323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@
import com.google.firebase.firestore.bundle.BundleMetadata;
import com.google.firebase.firestore.bundle.BundleReader;
import com.google.firebase.firestore.core.ViewSnapshot.SyncState;
import com.google.firebase.firestore.local.LocalDocumentsResult;
import com.google.firebase.firestore.local.LocalStore;
import com.google.firebase.firestore.local.LocalViewChanges;
import com.google.firebase.firestore.local.LocalWriteResult;
import com.google.firebase.firestore.local.QueryPurpose;
import com.google.firebase.firestore.local.QueryResult;
import com.google.firebase.firestore.local.ReferenceSet;
Expand Down Expand Up @@ -274,10 +274,10 @@ void stopListening(Query query) {
public void writeMutations(List<Mutation> mutations, TaskCompletionSource<Void> userTask) {
assertCallback("writeMutations");

LocalWriteResult result = localStore.writeLocally(mutations);
LocalDocumentsResult result = localStore.writeLocally(mutations);
addUserCallback(result.getBatchId(), userTask);

emitNewSnapsAndNotifyLocalStore(result.getChanges(), /*remoteEvent=*/ null);
emitNewSnapsAndNotifyLocalStore(result.getDocuments(), /*remoteEvent=*/ null);
remoteStore.fillWritePipeline();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import androidx.annotation.Nullable;
import androidx.annotation.VisibleForTesting;
import com.google.firebase.database.collection.ImmutableSortedMap;
import com.google.firebase.firestore.model.Document;
import com.google.firebase.firestore.model.DocumentKey;
import com.google.firebase.firestore.model.FieldIndex;
Expand All @@ -45,15 +44,13 @@ public class IndexBackfiller {

private final Scheduler scheduler;
private final Persistence persistence;
private final RemoteDocumentCache remoteDocumentCache;
private LocalDocumentsView localDocumentsView;
private IndexManager indexManager;
private int maxDocumentsToProcess = MAX_DOCUMENTS_TO_PROCESS;

public IndexBackfiller(Persistence persistence, AsyncQueue asyncQueue) {
this.persistence = persistence;
this.scheduler = new Scheduler(asyncQueue);
this.remoteDocumentCache = persistence.getRemoteDocumentCache();
}

public void setLocalDocumentsView(LocalDocumentsView localDocumentsView) {
Expand Down Expand Up @@ -110,12 +107,11 @@ public Scheduler getScheduler() {
public int backfill() {
hardAssert(localDocumentsView != null, "setLocalDocumentsView() not called");
hardAssert(indexManager != null, "setIndexManager() not called");
return persistence.runTransaction(
"Backfill Indexes", () -> writeIndexEntries(localDocumentsView));
return persistence.runTransaction("Backfill Indexes", () -> this.writeIndexEntries());
}

/** Writes index entries until the cap is reached. Returns the number of documents processed. */
private int writeIndexEntries(LocalDocumentsView localDocumentsView) {
private int writeIndexEntries() {
Set<String> processedCollectionGroups = new HashSet<>();
int documentsRemaining = maxDocumentsToProcess;
while (documentsRemaining > 0) {
Expand All @@ -124,58 +120,63 @@ private int writeIndexEntries(LocalDocumentsView localDocumentsView) {
break;
}
Logger.debug(LOG_TAG, "Processing collection: %s", collectionGroup);
documentsRemaining -=
writeEntriesForCollectionGroup(localDocumentsView, collectionGroup, documentsRemaining);
documentsRemaining -= writeEntriesForCollectionGroup(collectionGroup, documentsRemaining);
processedCollectionGroups.add(collectionGroup);
}
return maxDocumentsToProcess - documentsRemaining;
}

/** Writes entries for the fetched field indexes. */
/**
* Writes entries for the provided collection group. Returns the number of documents processed.
*/
private int writeEntriesForCollectionGroup(
LocalDocumentsView localDocumentsView, String collectionGroup, int entriesRemainingUnderCap) {
// TODO(indexing): Support mutation batch Ids when sorting and writing indexes.

String collectionGroup, int documentsRemainingUnderCap) {
// Use the earliest offset of all field indexes to query the local cache.
IndexOffset existingOffset = getExistingOffset(indexManager.getFieldIndexes(collectionGroup));
ImmutableSortedMap<DocumentKey, Document> documents =
localDocumentsView.getDocuments(collectionGroup, existingOffset, entriesRemainingUnderCap);
indexManager.updateIndexEntries(documents);
Collection<FieldIndex> fieldIndexes = indexManager.getFieldIndexes(collectionGroup);
IndexOffset existingOffset = getExistingOffset(fieldIndexes);

LocalDocumentsResult nextBatch =
localDocumentsView.getNextDocuments(
collectionGroup, existingOffset, documentsRemainingUnderCap);
indexManager.updateIndexEntries(nextBatch.getDocuments());

IndexOffset newOffset = getNewOffset(documents, existingOffset);
IndexOffset newOffset = getNewOffset(existingOffset, nextBatch);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, please add some tests to verify offset correctness when a new fieldindex is added to a collection group that has already been partially indexed.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, can we add a test to check that we call computeViews() to properly fetch the base doc for a patch mutation in an overlay without a corresponding base doc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a couple of tests to cover all these scenarios.

indexManager.updateCollectionGroup(collectionGroup, newOffset);

return documents.size();
return nextBatch.getDocuments().size();
}

/** Returns the lowest offset for the provided index group. */
private IndexOffset getExistingOffset(Collection<FieldIndex> fieldIndexes) {
IndexOffset lowestOffset = null;
for (FieldIndex fieldIndex : fieldIndexes) {
if (lowestOffset == null
|| fieldIndex.getIndexState().getOffset().compareTo(lowestOffset) < 0) {
lowestOffset = fieldIndex.getIndexState().getOffset();
/** Returns the next offset based on the provided documents. */
private IndexOffset getNewOffset(IndexOffset existingOffset, LocalDocumentsResult lookupResult) {
IndexOffset maxOffset = existingOffset;
for (Map.Entry<DocumentKey, Document> entry : lookupResult.getDocuments()) {
IndexOffset newOffset = IndexOffset.fromDocument(entry.getValue());
if (newOffset.compareTo(maxOffset) > 0) {
maxOffset = newOffset;
}
}
return lowestOffset == null ? IndexOffset.NONE : lowestOffset;
return IndexOffset.create(
maxOffset.getReadTime(),
maxOffset.getDocumentKey(),
Math.max(lookupResult.getBatchId(), existingOffset.getLargestBatchId()));
}

/** Returns the offset for the index based on the newly indexed documents. */
private IndexOffset getNewOffset(
ImmutableSortedMap<DocumentKey, Document> documents, IndexOffset currentOffset) {
if (documents.isEmpty()) {
return IndexOffset.create(remoteDocumentCache.getLatestReadTime());
} else {
IndexOffset latestOffset = currentOffset;
Iterator<Map.Entry<DocumentKey, Document>> it = documents.iterator();
while (it.hasNext()) {
IndexOffset newOffset = IndexOffset.fromDocument(it.next().getValue());
if (newOffset.compareTo(latestOffset) > 0) {
latestOffset = newOffset;
}
/** Returns the lowest offset for the provided index group. */
private IndexOffset getExistingOffset(Collection<FieldIndex> fieldIndexes) {
hardAssert(!fieldIndexes.isEmpty(), "Updating collection without indexes");

Iterator<FieldIndex> it = fieldIndexes.iterator();
IndexOffset minOffset = it.next().getIndexState().getOffset();
int minBatchId = minOffset.getLargestBatchId();
while (it.hasNext()) {
IndexOffset newOffset = it.next().getIndexState().getOffset();
if (newOffset.compareTo(minOffset) < 0) {
minOffset = newOffset;
}
return latestOffset;
minBatchId = Math.max(newOffset.getLargestBatchId(), minBatchId);
}

return IndexOffset.create(minOffset.getReadTime(), minOffset.getDocumentKey(), minBatchId);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,26 @@
import com.google.firebase.firestore.model.Document;
import com.google.firebase.firestore.model.DocumentKey;

/** The result of a write to the local store. */
public final class LocalWriteResult {
/**
* Represents a set of document along with their mutation batch ID.
*
* <p>This class is used when applying mutations to the local store and to propagate document
* updates to the indexing table.
*/
public final class LocalDocumentsResult {
private final int batchId;
private final ImmutableSortedMap<DocumentKey, Document> documents;

private final ImmutableSortedMap<DocumentKey, Document> changes;

LocalWriteResult(int batchId, ImmutableSortedMap<DocumentKey, Document> changes) {
LocalDocumentsResult(int batchId, ImmutableSortedMap<DocumentKey, Document> documents) {
this.batchId = batchId;
this.changes = changes;
this.documents = documents;
}

public int getBatchId() {
return batchId;
}

public ImmutableSortedMap<DocumentKey, Document> getChanges() {
return changes;
public ImmutableSortedMap<DocumentKey, Document> getDocuments() {
return documents;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
import static com.google.firebase.firestore.model.DocumentCollections.emptyDocumentMap;
import static com.google.firebase.firestore.util.Assert.hardAssert;

import androidx.annotation.Nullable;
import androidx.annotation.VisibleForTesting;
import com.google.firebase.Timestamp;
import com.google.firebase.database.collection.ImmutableSortedMap;
import com.google.firebase.firestore.core.Query;
import com.google.firebase.firestore.model.Document;
import com.google.firebase.firestore.model.DocumentKey;
import com.google.firebase.firestore.model.FieldIndex;
import com.google.firebase.firestore.model.FieldIndex.IndexOffset;
import com.google.firebase.firestore.model.MutableDocument;
import com.google.firebase.firestore.model.ResourcePath;
Expand All @@ -31,6 +33,7 @@
import com.google.firebase.firestore.model.mutation.MutationBatch;
import com.google.firebase.firestore.model.mutation.Overlay;
import com.google.firebase.firestore.model.mutation.PatchMutation;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -84,15 +87,10 @@ DocumentOverlayCache getDocumentOverlayCache() {
*/
Document getDocument(DocumentKey key) {
Overlay overlay = documentOverlayCache.getOverlay(key);
// Only read from remote document cache if overlay is a patch.
MutableDocument document =
(overlay == null || overlay.getMutation() instanceof PatchMutation)
? remoteDocumentCache.get(key)
: MutableDocument.newInvalidDocument(key);
MutableDocument document = getBaseDocument(key, overlay);
if (overlay != null) {
overlay.getMutation().applyToLocalView(document, null, Timestamp.now());
}

return document;
}

Expand All @@ -117,21 +115,35 @@ ImmutableSortedMap<DocumentKey, Document> getDocuments(Iterable<DocumentKey> key
*/
ImmutableSortedMap<DocumentKey, Document> getLocalViewOfDocuments(
Map<DocumentKey, MutableDocument> docs, Set<DocumentKey> existenceStateChanged) {
return computeViews(docs, Collections.emptyMap(), existenceStateChanged);
}

/**
* Computes the local view for doc, applying overlays from both {@code memoizedOverlays} and the
* overlay cache.
*/
private ImmutableSortedMap<DocumentKey, Document> computeViews(
Map<DocumentKey, MutableDocument> docs,
Map<DocumentKey, Overlay> memoizedOverlays,
Set<DocumentKey> existenceStateChanged) {
ImmutableSortedMap<DocumentKey, Document> results = emptyDocumentMap();
Map<DocumentKey, MutableDocument> recalculateDocuments = new HashMap<>();
for (Map.Entry<DocumentKey, MutableDocument> entry : docs.entrySet()) {
Overlay overlay = documentOverlayCache.getOverlay(entry.getKey());
for (MutableDocument doc : docs.values()) {
Overlay overlay =
memoizedOverlays.containsKey(doc.getKey())
? memoizedOverlays.get(doc.getKey())
: documentOverlayCache.getOverlay(doc.getKey());
// Recalculate an overlay if the document's existence state is changed due to a remote
// event *and* the overlay is a PatchMutation. This is because document existence state
// can change if some patch mutation's preconditions are met.
// NOTE: we recalculate when `overlay` is null as well, because there might be a patch
// mutation whose precondition does not match before the change (hence overlay==null),
// but would now match.
if (existenceStateChanged.contains(entry.getKey())
if (existenceStateChanged.contains(doc.getKey())
&& (overlay == null || overlay.getMutation() instanceof PatchMutation)) {
recalculateDocuments.put(entry.getKey(), docs.get(entry.getKey()));
recalculateDocuments.put(doc.getKey(), doc);
} else if (overlay != null) {
overlay.getMutation().applyToLocalView(entry.getValue(), null, Timestamp.now());
overlay.getMutation().applyToLocalView(doc, null, Timestamp.now());
}
}

Expand Down Expand Up @@ -190,14 +202,6 @@ void recalculateAndSaveOverlays(Set<DocumentKey> documentKeys) {
recalculateAndSaveOverlays(docs);
}

/** Gets the local view of the next {@code count} documents based on their read time. */
ImmutableSortedMap<DocumentKey, Document> getDocuments(
String collectionGroup, IndexOffset offset, int count) {
Map<DocumentKey, MutableDocument> docs =
remoteDocumentCache.getAll(collectionGroup, offset, count);
return getLocalViewOfDocuments(docs, new HashSet<>());
}

/**
* Performs a query against the local view of all documents.
*
Expand Down Expand Up @@ -250,7 +254,47 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection
return results;
}

/** Queries the remote documents and overlays by doing a full collection scan. */
/**
* Given a collection group, returns the next documents that follow the provided offset, along
* with an updated batch ID.
*
* <p>The documents returned by this method are ordered by remote version from the provided
* offset. If there are no more remote documents after the provided offset, documents with
* mutations in order of batch id from the offset are returned. Since all documents in a batch are
* returned together, the total number of documents returned can exceed {@code count}.
*
* @param collectionGroup The collection group for the documents.
* @param offset The offset to index into.
* @param count The number of documents to return
* @return A LocalDocumentsResult with the documents that follow the provided offset and the last
* processed batch id.
*/
LocalDocumentsResult getNextDocuments(String collectionGroup, IndexOffset offset, int count) {
Map<DocumentKey, MutableDocument> docs =
remoteDocumentCache.getAll(collectionGroup, offset, count);
Map<DocumentKey, Overlay> overlays =
count - docs.size() > 0
? documentOverlayCache.getOverlays(
collectionGroup, offset.getLargestBatchId(), count - docs.size())
: Collections.emptyMap();

int largestBatchId = FieldIndex.INITIAL_LARGEST_BATCH_ID;
for (Overlay overlay : overlays.values()) {
if (!docs.containsKey(overlay.getKey())) {
docs.put(overlay.getKey(), getBaseDocument(overlay.getKey(), overlay));
}
// The callsite will use the largest batch ID together with the latest read time to create
// a new index offset. Since we only process batch IDs if all remote documents have been read,
// no overlay will increase the overall read time. This is why we only need to special case
// the batch id.
largestBatchId = Math.max(largestBatchId, overlay.getLargestBatchId());
}

ImmutableSortedMap<DocumentKey, Document> localDocs =
computeViews(docs, overlays, Collections.emptySet());
return new LocalDocumentsResult(largestBatchId, localDocs);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trying to follow along for getDocumentsMatchingCollectionQuery().

If we get the overlays in L298, don't we want to run createBaseDocument() to get the original remote base doc for the overlay rather than just use a newInvalidDocument()?

private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollectionQuery(
Query query, IndexOffset offset) {
Map<DocumentKey, MutableDocument> remoteDocuments =
Expand Down Expand Up @@ -281,4 +325,11 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection

return results;
}

/** Returns a base document that can be used to apply `overlay`. */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

optional reword: Returns the base document that corresponds to the provided overlay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skipped as "corresponds" doesn't sound correct when we return an invalid document for a set() even though there is actually a valid document in the cache.

private MutableDocument getBaseDocument(DocumentKey key, @Nullable Overlay overlay) {
return (overlay == null || overlay.getMutation() instanceof PatchMutation)
? remoteDocumentCache.get(key)
: MutableDocument.newInvalidDocument(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public ImmutableSortedMap<DocumentKey, Document> handleUserChange(User user) {
}

/** Accepts locally generated Mutations and commits them to storage. */
public LocalWriteResult writeLocally(List<Mutation> mutations) {
public LocalDocumentsResult writeLocally(List<Mutation> mutations) {
Timestamp localWriteTime = Timestamp.now();

// TODO: Call queryEngine.handleDocumentChange() appropriately.
Expand Down Expand Up @@ -277,7 +277,7 @@ public LocalWriteResult writeLocally(List<Mutation> mutations) {
mutationQueue.addMutationBatch(localWriteTime, baseMutations, mutations);
Map<DocumentKey, Mutation> overlays = batch.applyToLocalDocumentSet(documents);
documentOverlayCache.saveOverlays(batch.getBatchId(), overlays);
return new LocalWriteResult(batch.getBatchId(), documents);
return new LocalDocumentsResult(batch.getBatchId(), documents);
});
}

Expand Down
Loading