Skip to content

Commit 6d64d18

Browse files
author
Brian Chen
committed
add mutation batch id to index backfill
1 parent 9e5c8e1 commit 6d64d18

File tree

11 files changed

+411
-45
lines changed

11 files changed

+411
-45
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/index/IndexEntry.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.auto.value.AutoValue;
2121
import com.google.firebase.firestore.model.DocumentKey;
2222
import com.google.firebase.firestore.util.Util;
23+
import java.util.Comparator;
2324

2425
/** Represents an index entry saved by the SDK in its local storage. */
2526
@AutoValue
@@ -53,4 +54,9 @@ public int compareTo(IndexEntry other) {
5354

5455
return nullSafeCompare(getArrayValue(), other.getArrayValue(), Util::compareByteArrays);
5556
}
57+
58+
public static final Comparator<IndexEntry> SEMANTIC_COMPARATOR =
59+
(left, right) -> {
60+
return left.compareTo(right);
61+
};
5662
}

firebase-firestore/src/main/java/com/google/firebase/firestore/local/DocumentOverlayCache.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package com.google.firebase.firestore.local;
1616

17+
import android.util.Pair;
1718
import androidx.annotation.Nullable;
1819
import com.google.firebase.firestore.model.DocumentKey;
1920
import com.google.firebase.firestore.model.ResourcePath;
@@ -54,4 +55,14 @@ public interface DocumentOverlayCache {
5455
* change past `sinceBatchId` are returned.
5556
*/
5657
Map<DocumentKey, Mutation> getOverlays(ResourcePath collection, int sinceBatchId);
58+
59+
/**
60+
* Returns a mapping of overlays containing the largest batch id for each overlay mutation.
61+
*
62+
* @param collection The collection path to get the overlays for.
63+
* @param sinceBatchId The minimum batch ID to filter by (exclusive). Only overlays that contain a
64+
* change past `sinceBatchId` are returned.
65+
*/
66+
Map<DocumentKey, Pair<Integer, Mutation>> getOverlaysWithBatchId(
67+
ResourcePath collection, int sinceBatchId);
5768
}

firebase-firestore/src/main/java/com/google/firebase/firestore/local/IndexBackfiller.java

Lines changed: 74 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,23 +137,56 @@ private int writeIndexEntries(LocalDocumentsView localDocumentsView) {
137137

138138
/** Writes entries for the fetched field indexes. */
139139
private int writeEntriesForCollectionGroup(
140-
LocalDocumentsView localDocumentsView, String collectionGroup, int entriesRemainingUnderCap) {
140+
LocalDocumentsView localDocumentsView,
141+
String collectionGroup,
142+
int documentsRemainingUnderCap) {
143+
int documentsProcessed = 0;
141144
Query query = new Query(ResourcePath.EMPTY, collectionGroup);
142145

143146
// Use the earliest offset of all field indexes to query the local cache.
144-
IndexOffset existingOffset = getExistingOffset(indexManager.getFieldIndexes(collectionGroup));
147+
Collection<FieldIndex> fieldIndexes = indexManager.getFieldIndexes(collectionGroup);
148+
IndexOffset existingOffset = getExistingOffset(fieldIndexes);
145149

146150
// TODO(indexing): Use limit queries to only fetch the required number of entries.
147-
// TODO(indexing): Support mutation batch Ids when sorting and writing indexes.
148151
ImmutableSortedMap<DocumentKey, Document> documents =
149152
localDocumentsView.getDocumentsMatchingQuery(query, existingOffset);
150153

151-
List<Document> oldestDocuments = getOldestDocuments(documents, entriesRemainingUnderCap);
154+
List<Document> oldestDocuments = getOldestDocuments(documents, documentsRemainingUnderCap);
152155
indexManager.updateIndexEntries(oldestDocuments);
156+
documentsProcessed += oldestDocuments.size();
153157

154158
IndexOffset newOffset = getNewOffset(oldestDocuments, existingOffset);
159+
160+
// Start indexing mutations only after catching up for read time. This must be done separately
161+
// since the backfill's first pass is in read time order rather than mutation batch id order.
162+
int documentsRemaining = documentsRemainingUnderCap - oldestDocuments.size();
163+
if (documentsRemaining > 0) {
164+
int earliestBatchId = getEarliestBatchId(fieldIndexes);
165+
newOffset =
166+
IndexOffset.create(newOffset.getReadTime(), newOffset.getDocumentKey(), earliestBatchId);
167+
Map<Document, Integer> documentToBatchId =
168+
localDocumentsView.getDocumentsBatchIdsMatchingCollectionGroupQuery(query, newOffset);
169+
170+
// Write index entries for documents touched by local mutations and update the offset to the
171+
// new largest batch id.
172+
if (!documentToBatchId.isEmpty()) {
173+
List<Document> documentsByBatchId =
174+
getDocumentsByBatchId(documentToBatchId, documentsRemaining);
175+
176+
// Largest batch id is the last element in documentsByBatchId since it's sorted in ascending
177+
// order by batch id.
178+
int newLargestBatchId =
179+
documentToBatchId.get(documentsByBatchId.get(documentsByBatchId.size() - 1));
180+
newOffset =
181+
IndexOffset.create(
182+
newOffset.getReadTime(), newOffset.getDocumentKey(), newLargestBatchId);
183+
indexManager.updateIndexEntries(documentsByBatchId);
184+
documentsProcessed += documentsByBatchId.size();
185+
}
186+
}
187+
155188
indexManager.updateCollectionGroup(collectionGroup, newOffset);
156-
return oldestDocuments.size();
189+
return documentsProcessed;
157190
}
158191

159192
/** Returns the lowest offset for the provided index group. */
@@ -169,7 +202,8 @@ private IndexOffset getExistingOffset(Collection<FieldIndex> fieldIndexes) {
169202
}
170203

171204
/**
172-
* Returns the offset for the index based on the newly indexed documents.
205+
* Returns the read time and document key offset for the index based on the newly indexed
206+
* documents.
173207
*
174208
* @param documents a list of documents sorted by read time and key (ascending)
175209
* @param currentOffset the current offset of the index group
@@ -201,6 +235,40 @@ private List<Document> getOldestDocuments(
201235
return oldestDocuments.subList(0, Math.min(count, oldestDocuments.size()));
202236
}
203237

238+
/**
239+
* Returns up to {@code count} documents sorted by batch id ascending, except in cases where the
240+
* mutation batch id contains more documents. This method will always return all documents in a
241+
* mutation batch, even if it goes over the limit.
242+
*
243+
* <p>For example, if count = 2, and we have: (docA, batch1), (docB, batch2), and (docC, batch2),
244+
* all three documents will be returned since docC is also part of batch2.
245+
*/
246+
private List<Document> getDocumentsByBatchId(
247+
Map<Document, Integer> documentToBatchId, int count) {
248+
List<Document> oldestDocuments = new ArrayList<>(documentToBatchId.keySet());
249+
Collections.sort(
250+
oldestDocuments, (l, r) -> documentToBatchId.get(l).compareTo(documentToBatchId.get(r)));
251+
int i = Math.min(count, oldestDocuments.size()) - 1;
252+
253+
// Include all documents that match the last document's batch id.
254+
int lastBatchId = documentToBatchId.get(oldestDocuments.get(i));
255+
while (i < oldestDocuments.size()
256+
&& lastBatchId == documentToBatchId.get(oldestDocuments.get(i))) {
257+
++i;
258+
}
259+
return oldestDocuments.subList(0, i);
260+
}
261+
262+
/** Returns the earliest batch id from the specified field indexes. */
263+
private int getEarliestBatchId(Collection<FieldIndex> fieldIndexes) {
264+
int lowestBatchId = (int) Double.POSITIVE_INFINITY;
265+
for (FieldIndex fieldIndex : fieldIndexes) {
266+
lowestBatchId =
267+
Math.min(fieldIndex.getIndexState().getOffset().getLargestBatchId(), lowestBatchId);
268+
}
269+
return lowestBatchId;
270+
}
271+
204272
@VisibleForTesting
205273
void setMaxDocumentsToProcess(int newMax) {
206274
maxDocumentsToProcess = newMax;

firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalDocumentsView.java

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import static com.google.firebase.firestore.model.DocumentCollections.emptyDocumentMap;
1818
import static com.google.firebase.firestore.util.Assert.hardAssert;
1919

20+
import android.util.Pair;
2021
import androidx.annotation.VisibleForTesting;
2122
import com.google.firebase.Timestamp;
2223
import com.google.firebase.database.collection.ImmutableSortedMap;
@@ -316,7 +317,32 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection
316317
}
317318
}
318319

319-
/** Queries the remote documents and overlays by doing a full collection scan. */
320+
/**
321+
* Returns a mapping of documents that match the provided query and offset mapped to the
322+
* corresponding batch id.
323+
*/
324+
// TODO(indexing): Support adding local mutations to collection group table.
325+
public Map<Document, Integer> getDocumentsBatchIdsMatchingCollectionGroupQuery(
326+
Query query, IndexOffset offset) {
327+
hardAssert(
328+
query.getPath().isEmpty(),
329+
"Currently we only support collection group queries at the root.");
330+
String collectionId = query.getCollectionGroup();
331+
Map<Document, Integer> results = new HashMap<>();
332+
List<ResourcePath> parents = indexManager.getCollectionParents(collectionId);
333+
334+
// Perform a collection query against each parent that contains the collectionId and
335+
// aggregate the results.
336+
for (ResourcePath parent : parents) {
337+
Query collectionQuery = query.asCollectionQueryAtPath(parent.append(collectionId));
338+
Map<Document, Integer> documentToBatchIds =
339+
getDocumentsMatchingCollectionQueryWithBatchId(collectionQuery, offset);
340+
;
341+
results.putAll(documentToBatchIds);
342+
}
343+
return results;
344+
}
345+
320346
private ImmutableSortedMap<DocumentKey, Document>
321347
getDocumentsMatchingCollectionQueryFromOverlayCache(Query query, IndexOffset offset) {
322348
ImmutableSortedMap<DocumentKey, MutableDocument> remoteDocuments =
@@ -325,27 +351,65 @@ private ImmutableSortedMap<DocumentKey, Document> getDocumentsMatchingCollection
325351

326352
// As documents might match the query because of their overlay we need to include all documents
327353
// in the result.
354+
remoteDocuments = updateRemoteDocumentsWithOverlayDocuments(remoteDocuments, overlays.keySet());
355+
356+
// Apply the overlays and match against the query.
357+
ImmutableSortedMap<DocumentKey, Document> results = emptyDocumentMap();
358+
for (Map.Entry<DocumentKey, MutableDocument> docEntry : remoteDocuments) {
359+
Mutation overlay = overlays.get(docEntry.getKey());
360+
if (overlay != null) {
361+
overlay.applyToLocalView(docEntry.getValue(), null, Timestamp.now());
362+
}
363+
// Finally, insert the documents that still match the query
364+
if (query.matches(docEntry.getValue())) {
365+
results = results.insert(docEntry.getKey(), docEntry.getValue());
366+
}
367+
}
368+
369+
return results;
370+
}
371+
372+
/** Updates the provided remote documents to include documents in the overlay. */
373+
private ImmutableSortedMap<DocumentKey, MutableDocument>
374+
updateRemoteDocumentsWithOverlayDocuments(
375+
ImmutableSortedMap<DocumentKey, MutableDocument> remoteDocuments,
376+
Set<DocumentKey> overlayDocumentKeys) {
328377
Set<DocumentKey> missingDocuments = new HashSet<>();
329-
for (Map.Entry<DocumentKey, Mutation> entry : overlays.entrySet()) {
330-
if (!remoteDocuments.containsKey(entry.getKey())) {
331-
missingDocuments.add(entry.getKey());
378+
for (DocumentKey key : overlayDocumentKeys) {
379+
if (!remoteDocuments.containsKey(key)) {
380+
missingDocuments.add(key);
332381
}
333382
}
334383
for (Map.Entry<DocumentKey, MutableDocument> entry :
335384
remoteDocumentCache.getAll(missingDocuments).entrySet()) {
336385
remoteDocuments = remoteDocuments.insert(entry.getKey(), entry.getValue());
337386
}
387+
return remoteDocuments;
388+
}
389+
390+
/** Queries the remote documents and overlays by doing a full collection scan. */
391+
public Map<Document, Integer> getDocumentsMatchingCollectionQueryWithBatchId(
392+
Query query, IndexOffset offset) {
393+
ImmutableSortedMap<DocumentKey, MutableDocument> remoteDocuments =
394+
remoteDocumentCache.getAllDocumentsMatchingQuery(query, offset);
395+
Map<DocumentKey, Pair<Integer, Mutation>> overlays =
396+
documentOverlayCache.getOverlaysWithBatchId(query.getPath(), offset.getLargestBatchId());
397+
398+
// As documents might match the query because of their overlay we need to include all documents
399+
// in the result.
400+
remoteDocuments = updateRemoteDocumentsWithOverlayDocuments(remoteDocuments, overlays.keySet());
338401

339402
// Apply the overlays and match against the query.
340-
ImmutableSortedMap<DocumentKey, Document> results = emptyDocumentMap();
403+
Map<Document, Integer> results = new HashMap<>();
341404
for (Map.Entry<DocumentKey, MutableDocument> docEntry : remoteDocuments) {
342-
Mutation overlay = overlays.get(docEntry.getKey());
405+
int batchId = overlays.get(docEntry.getKey()).first;
406+
Mutation overlay = overlays.get(docEntry.getKey()).second;
343407
if (overlay != null) {
344408
overlay.applyToLocalView(docEntry.getValue(), null, Timestamp.now());
345409
}
346410
// Finally, insert the documents that still match the query
347411
if (query.matches(docEntry.getValue())) {
348-
results = results.insert(docEntry.getKey(), docEntry.getValue());
412+
results.put(docEntry.getValue(), batchId);
349413
}
350414
}
351415

firebase-firestore/src/main/java/com/google/firebase/firestore/local/MemoryDocumentOverlayCache.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,32 @@ public Map<DocumentKey, Mutation> getOverlays(ResourcePath collection, int since
105105

106106
return result;
107107
}
108+
109+
@Override
110+
public Map<DocumentKey, Pair<Integer, Mutation>> getOverlaysWithBatchId(
111+
ResourcePath collection, int sinceBatchId) {
112+
Map<DocumentKey, Pair<Integer, Mutation>> result = new HashMap<>();
113+
114+
int immediateChildrenPathLength = collection.length() + 1;
115+
DocumentKey prefix = DocumentKey.fromPath(collection.append(""));
116+
Map<DocumentKey, Pair<Integer, Mutation>> view = overlays.tailMap(prefix);
117+
118+
for (Map.Entry<DocumentKey, Pair<Integer, Mutation>> entry : view.entrySet()) {
119+
DocumentKey key = entry.getKey();
120+
if (!collection.isPrefixOf(key.getPath())) {
121+
break;
122+
}
123+
// Documents from sub-collections
124+
if (key.getPath().length() != immediateChildrenPathLength) {
125+
continue;
126+
}
127+
128+
Pair<Integer, Mutation> batchIdToOverlay = entry.getValue();
129+
if (batchIdToOverlay.first > sinceBatchId) {
130+
result.put(entry.getKey(), batchIdToOverlay);
131+
}
132+
}
133+
134+
return result;
135+
}
108136
}

firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteDocumentOverlayCache.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import static com.google.firebase.firestore.util.Assert.fail;
1818

19+
import android.util.Pair;
1920
import androidx.annotation.Nullable;
2021
import com.google.firebase.firestore.auth.User;
2122
import com.google.firebase.firestore.model.DocumentKey;
@@ -111,4 +112,34 @@ public Map<DocumentKey, Mutation> getOverlays(ResourcePath collection, int since
111112

112113
return result;
113114
}
115+
116+
@Override
117+
public Map<DocumentKey, Pair<Integer, Mutation>> getOverlaysWithBatchId(
118+
ResourcePath collection, int sinceBatchId) {
119+
String collectionPath = EncodedPath.encode(collection);
120+
121+
Map<DocumentKey, Pair<Integer, Mutation>> result = new HashMap<>();
122+
123+
db.query(
124+
"SELECT document_id, overlay_mutation, largest_batch_id FROM document_overlays "
125+
+ "WHERE uid = ? AND collection_path = ? AND largest_batch_id > ?")
126+
.binding(uid, collectionPath, sinceBatchId)
127+
.forEach(
128+
row -> {
129+
try {
130+
String documentId = row.getString(0);
131+
Write write = Write.parseFrom(row.getBlob(1));
132+
int largestBatchId = row.getInt(2);
133+
Mutation mutation = serializer.decodeMutation(write);
134+
135+
result.put(
136+
DocumentKey.fromPath(collection.append(documentId)),
137+
new Pair(largestBatchId, mutation));
138+
} catch (InvalidProtocolBufferException e) {
139+
throw fail("Overlay failed to parse: %s", e);
140+
}
141+
});
142+
143+
return result;
144+
}
114145
}

firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteIndexManager.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static java.lang.Math.max;
2323

2424
import androidx.annotation.Nullable;
25+
import androidx.annotation.VisibleForTesting;
2526
import com.google.firebase.Timestamp;
2627
import com.google.firebase.firestore.auth.User;
2728
import com.google.firebase.firestore.core.Bound;
@@ -105,7 +106,8 @@ public void start() {
105106
// Fetch all index states if persisted for the user. These states contain per user information
106107
// on how up to date the index is.
107108
db.query(
108-
"SELECT index_id, sequence_number, read_time_seconds, read_time_nanos, document_key "
109+
"SELECT index_id, sequence_number, read_time_seconds, read_time_nanos, "
110+
+ "document_key, largest_batch_id "
109111
+ "FROM index_state WHERE uid = ?")
110112
.binding(uid)
111113
.forEach(
@@ -116,8 +118,10 @@ public void start() {
116118
new SnapshotVersion(new Timestamp(row.getLong(2), row.getInt(3)));
117119
DocumentKey documentKey =
118120
DocumentKey.fromPath(EncodedPath.decodeResourcePath(row.getString(4)));
119-
indexStates.put(
120-
indexId, FieldIndex.IndexState.create(sequenceNumber, readTime, documentKey));
121+
int largestBatchId = row.getInt(5);
122+
FieldIndex.IndexOffset offset =
123+
FieldIndex.IndexOffset.create(readTime, documentKey, largestBatchId);
124+
indexStates.put(indexId, FieldIndex.IndexState.create(sequenceNumber, offset));
121125
});
122126

123127
// Fetch all indices and combine with user's index state if available.
@@ -247,6 +251,7 @@ private void updateEntries(
247251
diffCollections(
248252
existingEntries,
249253
newEntries,
254+
IndexEntry.SEMANTIC_COMPARATOR,
250255
entry -> addIndexEntry(document, entry),
251256
entry -> deleteIndexEntry(document, entry));
252257
}
@@ -343,8 +348,8 @@ private void deleteIndexEntry(Document document, IndexEntry indexEntry) {
343348
document.getKey().toString());
344349
}
345350

346-
private SortedSet<IndexEntry> getExistingIndexEntries(
347-
DocumentKey documentKey, FieldIndex fieldIndex) {
351+
@VisibleForTesting
352+
SortedSet<IndexEntry> getExistingIndexEntries(DocumentKey documentKey, FieldIndex fieldIndex) {
348353
SortedSet<IndexEntry> results = new TreeSet<>();
349354
db.query(
350355
"SELECT array_value, directional_value FROM index_entries "

0 commit comments

Comments
 (0)