16
16
17
17
import static com .google .firebase .firestore .util .Assert .hardAssert ;
18
18
19
+ import android .util .Pair ;
19
20
import androidx .annotation .Nullable ;
20
21
import androidx .annotation .VisibleForTesting ;
21
- import com .google .firebase .database .collection .ImmutableSortedMap ;
22
- import com .google .firebase .firestore .core .Query ;
23
22
import com .google .firebase .firestore .model .Document ;
24
- import com .google .firebase .firestore .model .DocumentKey ;
25
23
import com .google .firebase .firestore .model .FieldIndex ;
26
24
import com .google .firebase .firestore .model .FieldIndex .IndexOffset ;
27
- import com .google .firebase .firestore .model .ResourcePath ;
28
25
import com .google .firebase .firestore .util .AsyncQueue ;
29
26
import com .google .firebase .firestore .util .Logger ;
30
- import java .util .ArrayList ;
31
27
import java .util .Collection ;
32
- import java .util .Collections ;
33
28
import java .util .HashSet ;
34
29
import java .util .List ;
35
- import java .util .Map ;
36
30
import java .util .Set ;
37
31
import java .util .concurrent .TimeUnit ;
38
32
@@ -49,15 +43,13 @@ public class IndexBackfiller {
49
43
50
44
private final Scheduler scheduler ;
51
45
private final Persistence persistence ;
52
- private final RemoteDocumentCache remoteDocumentCache ;
53
46
private LocalDocumentsView localDocumentsView ;
54
47
private IndexManager indexManager ;
55
48
private int maxDocumentsToProcess = MAX_DOCUMENTS_TO_PROCESS ;
56
49
57
50
public IndexBackfiller (Persistence persistence , AsyncQueue asyncQueue ) {
58
51
this .persistence = persistence ;
59
52
this .scheduler = new Scheduler (asyncQueue );
60
- this .remoteDocumentCache = persistence .getRemoteDocumentCache ();
61
53
}
62
54
63
55
public void setLocalDocumentsView (LocalDocumentsView localDocumentsView ) {
@@ -114,12 +106,11 @@ public Scheduler getScheduler() {
114
106
public int backfill () {
115
107
hardAssert (localDocumentsView != null , "setLocalDocumentsView() not called" );
116
108
hardAssert (indexManager != null , "setIndexManager() not called" );
117
- return persistence .runTransaction (
118
- "Backfill Indexes" , () -> writeIndexEntries (localDocumentsView ));
109
+ return persistence .runTransaction ("Backfill Indexes" , this ::writeIndexEntries );
119
110
}
120
111
121
112
/** Writes index entries until the cap is reached. Returns the number of documents processed. */
122
- private int writeIndexEntries (LocalDocumentsView localDocumentsView ) {
113
+ private int writeIndexEntries () {
123
114
Set <String > processedCollectionGroups = new HashSet <>();
124
115
int documentsRemaining = maxDocumentsToProcess ;
125
116
while (documentsRemaining > 0 ) {
@@ -128,62 +119,39 @@ private int writeIndexEntries(LocalDocumentsView localDocumentsView) {
128
119
break ;
129
120
}
130
121
Logger .debug (LOG_TAG , "Processing collection: %s" , collectionGroup );
131
- documentsRemaining -=
132
- writeEntriesForCollectionGroup (localDocumentsView , collectionGroup , documentsRemaining );
122
+ documentsRemaining -= writeEntriesForCollectionGroup (collectionGroup , documentsRemaining );
133
123
processedCollectionGroups .add (collectionGroup );
134
124
}
135
125
return maxDocumentsToProcess - documentsRemaining ;
136
126
}
137
127
138
- /** Writes entries for the fetched field indexes. */
128
+ /**
129
+ * Writes entries for the corresponding field indexes in the provided collection group. Returns
130
+ * the number of documents processed.
131
+ */
139
132
private int writeEntriesForCollectionGroup (
140
- LocalDocumentsView localDocumentsView ,
141
- String collectionGroup ,
142
- int documentsRemainingUnderCap ) {
133
+ String collectionGroup , int documentsRemainingUnderCap ) {
143
134
int documentsProcessed = 0 ;
144
- Query query = new Query (ResourcePath .EMPTY , collectionGroup );
145
135
146
136
// Use the earliest offset of all field indexes to query the local cache.
147
137
Collection <FieldIndex > fieldIndexes = indexManager .getFieldIndexes (collectionGroup );
148
- IndexOffset existingOffset = getExistingOffset (fieldIndexes );
149
-
150
- // TODO(indexing): Use limit queries to only fetch the required number of entries.
151
- ImmutableSortedMap <DocumentKey , Document > documents =
152
- localDocumentsView .getDocumentsMatchingQuery (query , existingOffset );
153
-
154
- List <Document > oldestDocuments = getOldestDocuments (documents , documentsRemainingUnderCap );
155
- indexManager .updateIndexEntries (oldestDocuments );
156
- documentsProcessed += oldestDocuments .size ();
157
-
158
- IndexOffset newOffset = getNewOffset (oldestDocuments , existingOffset );
159
-
160
- // Start indexing mutations only after catching up for read time. This must be done separately
161
- // since the backfill's first pass is in read time order rather than mutation batch id order.
162
- int documentsRemaining = documentsRemainingUnderCap - oldestDocuments .size ();
163
- if (documentsRemaining > 0 ) {
164
- int earliestBatchId = getEarliestBatchId (fieldIndexes );
165
- newOffset =
166
- IndexOffset .create (newOffset .getReadTime (), newOffset .getDocumentKey (), earliestBatchId );
167
- Map <Document , Integer > documentToBatchId =
168
- localDocumentsView .getDocumentsBatchIdsMatchingCollectionGroupQuery (query , newOffset );
169
-
170
- // Write index entries for documents touched by local mutations and update the offset to the
171
- // new largest batch id.
172
- if (!documentToBatchId .isEmpty ()) {
173
- List <Document > documentsByBatchId =
174
- getDocumentsByBatchId (documentToBatchId , documentsRemaining );
175
-
176
- // Largest batch id is the last element in documentsByBatchId since it's sorted in ascending
177
- // order by batch id.
178
- int newLargestBatchId =
179
- documentToBatchId .get (documentsByBatchId .get (documentsByBatchId .size () - 1 ));
180
- newOffset =
181
- IndexOffset .create (
182
- newOffset .getReadTime (), newOffset .getDocumentKey (), newLargestBatchId );
183
- indexManager .updateIndexEntries (documentsByBatchId );
184
- documentsProcessed += documentsByBatchId .size ();
185
- }
186
- }
138
+ IndexOffset startingOffset = getExistingOffset (fieldIndexes );
139
+
140
+ // Represents documents indexed and the updated offset post-update.
141
+ Pair <IndexOffset , List <Document >> pair =
142
+ localDocumentsView .getNextDocumentsAndOffsetForCollectionGroup (
143
+ collectionGroup , startingOffset , /* updateMemoizedResults=` */ true );
144
+ IndexOffset newOffset ;
145
+ List <Document > documentsToIndex ;
146
+ do {
147
+ newOffset = pair .first ;
148
+ documentsToIndex = pair .second ;
149
+ indexManager .updateIndexEntries (documentsToIndex );
150
+ documentsProcessed += documentsToIndex .size ();
151
+ pair =
152
+ localDocumentsView .getNextDocumentsAndOffsetForCollectionGroup (
153
+ collectionGroup , newOffset , /* updateMemoizedResults= */ false );
154
+ } while (documentsProcessed < documentsRemainingUnderCap && !documentsToIndex .isEmpty ());
187
155
188
156
indexManager .updateCollectionGroup (collectionGroup , newOffset );
189
157
return documentsProcessed ;
@@ -198,70 +166,19 @@ private IndexOffset getExistingOffset(Collection<FieldIndex> fieldIndexes) {
198
166
lowestOffset = fieldIndex .getIndexState ().getOffset ();
199
167
}
200
168
}
201
- return lowestOffset == null ? IndexOffset .NONE : lowestOffset ;
202
- }
203
-
204
- /**
205
- * Returns the read time and document key offset for the index based on the newly indexed
206
- * documents.
207
- *
208
- * @param documents a list of documents sorted by read time and key (ascending)
209
- * @param currentOffset the current offset of the index group
210
- */
211
- private IndexOffset getNewOffset (List <Document > documents , IndexOffset currentOffset ) {
212
- IndexOffset latestOffset =
213
- documents .isEmpty ()
214
- ? IndexOffset .create (remoteDocumentCache .getLatestReadTime ())
215
- : IndexOffset .create (
216
- documents .get (documents .size () - 1 ).getReadTime (),
217
- documents .get (documents .size () - 1 ).getKey ());
218
- // Make sure the index does not go back in time
219
- latestOffset = latestOffset .compareTo (currentOffset ) > 0 ? latestOffset : currentOffset ;
220
- return latestOffset ;
221
- }
222
-
223
- /** Returns up to {@code count} documents sorted by read time and key. */
224
- private List <Document > getOldestDocuments (
225
- ImmutableSortedMap <DocumentKey , Document > documents , int count ) {
226
- List <Document > oldestDocuments = new ArrayList <>();
227
- for (Map .Entry <DocumentKey , Document > entry : documents ) {
228
- oldestDocuments .add (entry .getValue ());
229
- }
230
- Collections .sort (
231
- oldestDocuments ,
232
- (l , r ) ->
233
- IndexOffset .create (l .getReadTime (), l .getKey ())
234
- .compareTo (IndexOffset .create (r .getReadTime (), r .getKey ())));
235
- return oldestDocuments .subList (0 , Math .min (count , oldestDocuments .size ()));
236
- }
237
-
238
- /**
239
- * Returns up to {@code count} documents sorted by batch id ascending, except in cases where the
240
- * mutation batch id contains more documents. This method will always return all documents in a
241
- * mutation batch, even if it goes over the limit.
242
- *
243
- * <p>For example, if count = 2, and we have: (docA, batch1), (docB, batch2), and (docC, batch2),
244
- * all three documents will be returned since docC is also part of batch2.
245
- */
246
- private List <Document > getDocumentsByBatchId (
247
- Map <Document , Integer > documentToBatchId , int count ) {
248
- List <Document > oldestDocuments = new ArrayList <>(documentToBatchId .keySet ());
249
- Collections .sort (
250
- oldestDocuments , (l , r ) -> documentToBatchId .get (l ).compareTo (documentToBatchId .get (r )));
251
- int i = Math .min (count , oldestDocuments .size ()) - 1 ;
252
-
253
- // Include all documents that match the last document's batch id.
254
- int lastBatchId = documentToBatchId .get (oldestDocuments .get (i ));
255
- while (i < oldestDocuments .size ()
256
- && lastBatchId == documentToBatchId .get (oldestDocuments .get (i ))) {
257
- ++i ;
258
- }
259
- return oldestDocuments .subList (0 , i );
169
+ lowestOffset = lowestOffset == null ? IndexOffset .NONE : lowestOffset ;
170
+
171
+ // Add earliest batch id to offset
172
+ int earliestBatchId = getEarliestBatchId (fieldIndexes );
173
+ lowestOffset =
174
+ IndexOffset .create (
175
+ lowestOffset .getReadTime (), lowestOffset .getDocumentKey (), earliestBatchId );
176
+ return lowestOffset ;
260
177
}
261
178
262
179
/** Returns the earliest batch id from the specified field indexes. */
263
180
private int getEarliestBatchId (Collection <FieldIndex > fieldIndexes ) {
264
- int lowestBatchId = ( int ) Double . POSITIVE_INFINITY ;
181
+ int lowestBatchId = Integer . MAX_VALUE ;
265
182
for (FieldIndex fieldIndex : fieldIndexes ) {
266
183
lowestBatchId =
267
184
Math .min (fieldIndex .getIndexState ().getOffset ().getLargestBatchId (), lowestBatchId );
0 commit comments