18
18
19
19
import androidx .annotation .Nullable ;
20
20
import androidx .annotation .VisibleForTesting ;
21
- import com .google .firebase .Timestamp ;
22
21
import com .google .firebase .database .collection .ImmutableSortedMap ;
23
22
import com .google .firebase .firestore .core .Query ;
24
23
import com .google .firebase .firestore .model .Document ;
27
26
import com .google .firebase .firestore .model .ResourcePath ;
28
27
import com .google .firebase .firestore .model .SnapshotVersion ;
29
28
import com .google .firebase .firestore .util .AsyncQueue ;
29
+ import java .util .ArrayList ;
30
30
import java .util .Collection ;
31
+ import java .util .Collections ;
32
+ import java .util .List ;
31
33
import java .util .Map ;
32
- import java .util .PriorityQueue ;
33
- import java .util .Queue ;
34
34
import java .util .concurrent .TimeUnit ;
35
35
36
36
/** Implements the steps for backfilling indexes. */
@@ -44,13 +44,16 @@ public class IndexBackfiller {
44
44
45
45
private final Scheduler scheduler ;
46
46
private final Persistence persistence ;
47
+ private final RemoteDocumentCache remoteDocumentCache ;
47
48
private LocalDocumentsView localDocumentsView ;
48
49
private IndexManager indexManager ;
49
50
private int maxDocumentsToProcess = MAX_DOCUMENTS_TO_PROCESS ;
51
+ private long currentSequenceNumber = -1 ;
50
52
51
53
public IndexBackfiller (Persistence persistence , AsyncQueue asyncQueue ) {
52
54
this .persistence = persistence ;
53
55
this .scheduler = new Scheduler (asyncQueue );
56
+ this .remoteDocumentCache = persistence .getRemoteDocumentCache ();
54
57
}
55
58
56
59
public void setLocalDocumentsView (LocalDocumentsView localDocumentsView ) {
@@ -131,7 +134,7 @@ public Results backfill() {
131
134
return persistence .runTransaction (
132
135
"Backfill Indexes" ,
133
136
() -> {
134
- // TODO(indexing): Handle field indexes that are removed by the user.
137
+ currentSequenceNumber = indexManager . getHighestSequenceNumber () + 1 ;
135
138
int documentsProcessed = writeIndexEntries (localDocumentsView );
136
139
return new Results (/* hasRun= */ true , documentsProcessed );
137
140
});
@@ -140,11 +143,10 @@ public Results backfill() {
140
143
/** Writes index entries until the cap is reached. Returns the number of documents processed. */
141
144
private int writeIndexEntries (LocalDocumentsView localDocumentsView ) {
142
145
int documentsProcessed = 0 ;
143
- Timestamp startingTimestamp = Timestamp .now ();
144
146
145
147
while (documentsProcessed < maxDocumentsToProcess ) {
146
148
int documentsRemaining = maxDocumentsToProcess - documentsProcessed ;
147
- String collectionGroup = indexManager .getNextCollectionGroupToUpdate (startingTimestamp );
149
+ String collectionGroup = indexManager .getNextCollectionGroupToUpdate (currentSequenceNumber );
148
150
if (collectionGroup == null ) {
149
151
break ;
150
152
}
@@ -160,42 +162,61 @@ private int writeEntriesForCollectionGroup(
160
162
LocalDocumentsView localDocumentsView , String collectionGroup , int entriesRemainingUnderCap ) {
161
163
Query query = new Query (ResourcePath .EMPTY , collectionGroup );
162
164
163
- // Use the earliest updateTime of all field indexes as the base updateTime .
164
- SnapshotVersion earliestUpdateTime =
165
- getEarliestUpdateTime (indexManager .getFieldIndexes (collectionGroup ));
165
+ // Use the earliest readTime of all field indexes as the base readtime .
166
+ SnapshotVersion earliestReadTime =
167
+ getEarliestReadTime (indexManager .getFieldIndexes (collectionGroup ));
166
168
167
169
// TODO(indexing): Use limit queries to only fetch the required number of entries.
168
170
// TODO(indexing): Support mutation batch Ids when sorting and writing indexes.
169
171
ImmutableSortedMap <DocumentKey , Document > documents =
170
- localDocumentsView .getDocumentsMatchingQuery (query , earliestUpdateTime );
172
+ localDocumentsView .getDocumentsMatchingQuery (query , earliestReadTime );
171
173
172
- Queue <Document > oldestDocuments = getOldestDocuments (documents , entriesRemainingUnderCap );
174
+ List <Document > oldestDocuments = getOldestDocuments (documents , entriesRemainingUnderCap );
173
175
indexManager .updateIndexEntries (oldestDocuments );
176
+
177
+ SnapshotVersion latestReadTime = getPostUpdateReadTime (oldestDocuments , earliestReadTime );
178
+ indexManager .updateCollectionGroup (collectionGroup , currentSequenceNumber , latestReadTime );
174
179
return oldestDocuments .size ();
175
180
}
176
181
182
+ /** Returns the new read time for the index. */
183
+ private SnapshotVersion getPostUpdateReadTime (
184
+ List <Document > documents , SnapshotVersion currentReadTime ) {
185
+ SnapshotVersion latestReadTime =
186
+ documents .isEmpty ()
187
+ ? remoteDocumentCache .getLatestReadTime ()
188
+ : documents .get (documents .size () - 1 ).getReadTime ();
189
+ // Make sure the index does not go back in time
190
+ latestReadTime =
191
+ latestReadTime .compareTo (currentReadTime ) > 0 ? latestReadTime : currentReadTime ;
192
+ return latestReadTime ;
193
+ }
194
+
177
195
/** Returns up to {@code count} documents sorted by read time. */
178
- private Queue <Document > getOldestDocuments (
196
+ private List <Document > getOldestDocuments (
179
197
ImmutableSortedMap <DocumentKey , Document > documents , int count ) {
180
- Queue <Document > oldestDocuments =
181
- new PriorityQueue <>(count + 1 , (l , r ) -> r .getReadTime ().compareTo (l .getReadTime ()));
198
+ List <Document > oldestDocuments = new ArrayList <>();
182
199
for (Map .Entry <DocumentKey , Document > entry : documents ) {
183
200
oldestDocuments .add (entry .getValue ());
184
- if (oldestDocuments .size () > count ) {
185
- oldestDocuments .poll ();
186
- }
187
201
}
188
- return oldestDocuments ;
202
+ Collections .sort (
203
+ oldestDocuments ,
204
+ (l , r ) -> {
205
+ int cmp = l .getReadTime ().compareTo (r .getReadTime ());
206
+ if (cmp != 0 ) return cmp ;
207
+ return l .getKey ().compareTo (r .getKey ());
208
+ });
209
+ return oldestDocuments .subList (0 , Math .min (count , oldestDocuments .size ()));
189
210
}
190
211
191
- private SnapshotVersion getEarliestUpdateTime (Collection <FieldIndex > fieldIndexes ) {
212
+ private SnapshotVersion getEarliestReadTime (Collection <FieldIndex > fieldIndexes ) {
192
213
SnapshotVersion lowestVersion = null ;
193
214
for (FieldIndex fieldIndex : fieldIndexes ) {
194
215
lowestVersion =
195
216
lowestVersion == null
196
- ? fieldIndex .getUpdateTime ()
197
- : fieldIndex .getUpdateTime ().compareTo (lowestVersion ) < 0
198
- ? fieldIndex .getUpdateTime ()
217
+ ? fieldIndex .getIndexState (). getReadTime ()
218
+ : fieldIndex .getIndexState (). getReadTime ().compareTo (lowestVersion ) < 0
219
+ ? fieldIndex .getIndexState (). getReadTime ()
199
220
: lowestVersion ;
200
221
}
201
222
return lowestVersion ;
0 commit comments