18
18
19
19
import androidx .annotation .Nullable ;
20
20
import androidx .annotation .VisibleForTesting ;
21
- import com .google .firebase .Timestamp ;
22
21
import com .google .firebase .database .collection .ImmutableSortedMap ;
23
22
import com .google .firebase .firestore .core .Query ;
24
23
import com .google .firebase .firestore .model .Document ;
27
26
import com .google .firebase .firestore .model .ResourcePath ;
28
27
import com .google .firebase .firestore .model .SnapshotVersion ;
29
28
import com .google .firebase .firestore .util .AsyncQueue ;
29
+ import java .util .ArrayList ;
30
30
import java .util .Collection ;
31
+ import java .util .Collections ;
32
+ import java .util .HashSet ;
33
+ import java .util .List ;
31
34
import java .util .Map ;
32
- import java .util .PriorityQueue ;
33
- import java .util .Queue ;
35
+ import java .util .Set ;
34
36
import java .util .concurrent .TimeUnit ;
35
37
36
38
/** Implements the steps for backfilling indexes. */
@@ -44,13 +46,15 @@ public class IndexBackfiller {
44
46
45
47
private final Scheduler scheduler ;
46
48
private final Persistence persistence ;
49
+ private final RemoteDocumentCache remoteDocumentCache ;
47
50
private LocalDocumentsView localDocumentsView ;
48
51
private IndexManager indexManager ;
49
52
private int maxDocumentsToProcess = MAX_DOCUMENTS_TO_PROCESS ;
50
53
51
54
public IndexBackfiller (Persistence persistence , AsyncQueue asyncQueue ) {
52
55
this .persistence = persistence ;
53
56
this .scheduler = new Scheduler (asyncQueue );
57
+ this .remoteDocumentCache = persistence .getRemoteDocumentCache ();
54
58
}
55
59
56
60
public void setLocalDocumentsView (LocalDocumentsView localDocumentsView ) {
@@ -131,71 +135,92 @@ public Results backfill() {
131
135
return persistence .runTransaction (
132
136
"Backfill Indexes" ,
133
137
() -> {
134
- // TODO(indexing): Handle field indexes that are removed by the user.
135
138
int documentsProcessed = writeIndexEntries (localDocumentsView );
136
139
return new Results (/* hasRun= */ true , documentsProcessed );
137
140
});
138
141
}
139
142
140
143
/** Writes index entries until the cap is reached. Returns the number of documents processed. */
141
144
private int writeIndexEntries (LocalDocumentsView localDocumentsView ) {
142
- int documentsProcessed = 0 ;
143
- Timestamp startingTimestamp = Timestamp .now ();
144
-
145
- while (documentsProcessed < maxDocumentsToProcess ) {
146
- int documentsRemaining = maxDocumentsToProcess - documentsProcessed ;
147
- String collectionGroup = indexManager .getNextCollectionGroupToUpdate (startingTimestamp );
148
- if (collectionGroup == null ) {
145
+ Set <String > processedCollectionGroups = new HashSet <>();
146
+ int documentsRemaining = maxDocumentsToProcess ;
147
+ while (documentsRemaining > 0 ) {
148
+ String collectionGroup = indexManager .getNextCollectionGroupToUpdate ();
149
+ if (collectionGroup == null || processedCollectionGroups .contains (collectionGroup )) {
149
150
break ;
150
151
}
151
- documentsProcessed + =
152
+ documentsRemaining - =
152
153
writeEntriesForCollectionGroup (localDocumentsView , collectionGroup , documentsRemaining );
154
+ processedCollectionGroups .add (collectionGroup );
153
155
}
154
-
155
- return documentsProcessed ;
156
+ return maxDocumentsToProcess - documentsRemaining ;
156
157
}
157
158
158
159
/** Writes entries for the fetched field indexes. */
159
160
private int writeEntriesForCollectionGroup (
160
161
LocalDocumentsView localDocumentsView , String collectionGroup , int entriesRemainingUnderCap ) {
161
162
Query query = new Query (ResourcePath .EMPTY , collectionGroup );
162
163
163
- // Use the earliest updateTime of all field indexes as the base updateTime .
164
- SnapshotVersion earliestUpdateTime =
165
- getEarliestUpdateTime (indexManager .getFieldIndexes (collectionGroup ));
164
+ // Use the earliest readTime of all field indexes as the base readtime .
165
+ SnapshotVersion earliestReadTime =
166
+ getEarliestReadTime (indexManager .getFieldIndexes (collectionGroup ));
166
167
167
168
// TODO(indexing): Use limit queries to only fetch the required number of entries.
168
169
// TODO(indexing): Support mutation batch Ids when sorting and writing indexes.
169
170
ImmutableSortedMap <DocumentKey , Document > documents =
170
- localDocumentsView .getDocumentsMatchingQuery (query , earliestUpdateTime );
171
+ localDocumentsView .getDocumentsMatchingQuery (query , earliestReadTime );
171
172
172
- Queue <Document > oldestDocuments = getOldestDocuments (documents , entriesRemainingUnderCap );
173
+ List <Document > oldestDocuments = getOldestDocuments (documents , entriesRemainingUnderCap );
173
174
indexManager .updateIndexEntries (oldestDocuments );
175
+
176
+ SnapshotVersion latestReadTime = getPostUpdateReadTime (oldestDocuments , earliestReadTime );
177
+ indexManager .updateCollectionGroup (collectionGroup , latestReadTime );
174
178
return oldestDocuments .size ();
175
179
}
176
180
181
+ /**
182
+ * Returns the new read time for the index.
183
+ *
184
+ * @param documents a list of documents sorted by read time (ascending)
185
+ * @param currentReadTime the current read time of the index
186
+ */
187
+ private SnapshotVersion getPostUpdateReadTime (
188
+ List <Document > documents , SnapshotVersion currentReadTime ) {
189
+ SnapshotVersion latestReadTime =
190
+ documents .isEmpty ()
191
+ ? remoteDocumentCache .getLatestReadTime ()
192
+ : documents .get (documents .size () - 1 ).getReadTime ();
193
+ // Make sure the index does not go back in time
194
+ latestReadTime =
195
+ latestReadTime .compareTo (currentReadTime ) > 0 ? latestReadTime : currentReadTime ;
196
+ return latestReadTime ;
197
+ }
198
+
177
199
/** Returns up to {@code count} documents sorted by read time. */
178
- private Queue <Document > getOldestDocuments (
200
+ private List <Document > getOldestDocuments (
179
201
ImmutableSortedMap <DocumentKey , Document > documents , int count ) {
180
- Queue <Document > oldestDocuments =
181
- new PriorityQueue <>(count + 1 , (l , r ) -> r .getReadTime ().compareTo (l .getReadTime ()));
202
+ List <Document > oldestDocuments = new ArrayList <>();
182
203
for (Map .Entry <DocumentKey , Document > entry : documents ) {
183
204
oldestDocuments .add (entry .getValue ());
184
- if (oldestDocuments .size () > count ) {
185
- oldestDocuments .poll ();
186
- }
187
205
}
188
- return oldestDocuments ;
206
+ Collections .sort (
207
+ oldestDocuments ,
208
+ (l , r ) -> {
209
+ int cmp = l .getReadTime ().compareTo (r .getReadTime ());
210
+ if (cmp != 0 ) return cmp ;
211
+ return l .getKey ().compareTo (r .getKey ());
212
+ });
213
+ return oldestDocuments .subList (0 , Math .min (count , oldestDocuments .size ()));
189
214
}
190
215
191
- private SnapshotVersion getEarliestUpdateTime (Collection <FieldIndex > fieldIndexes ) {
216
+ private SnapshotVersion getEarliestReadTime (Collection <FieldIndex > fieldIndexes ) {
192
217
SnapshotVersion lowestVersion = null ;
193
218
for (FieldIndex fieldIndex : fieldIndexes ) {
194
219
lowestVersion =
195
220
lowestVersion == null
196
- ? fieldIndex .getUpdateTime ()
197
- : fieldIndex .getUpdateTime ().compareTo (lowestVersion ) < 0
198
- ? fieldIndex .getUpdateTime ()
221
+ ? fieldIndex .getIndexState (). getReadTime ()
222
+ : fieldIndex .getIndexState (). getReadTime ().compareTo (lowestVersion ) < 0
223
+ ? fieldIndex .getIndexState (). getReadTime ()
199
224
: lowestVersion ;
200
225
}
201
226
return lowestVersion ;
0 commit comments