@@ -325,54 +325,41 @@ public SnapshotVersion getLastRemoteSnapshotVersion() {
325
325
* <p>LocalDocuments are re-calculated if there are remaining mutations in the queue.
326
326
*/
327
327
public ImmutableSortedMap <DocumentKey , MaybeDocument > applyRemoteEvent (RemoteEvent remoteEvent ) {
328
+ SnapshotVersion remoteVersion = remoteEvent .getSnapshotVersion ();
329
+
328
330
// TODO: Call queryEngine.handleDocumentChange() appropriately.
329
331
return persistence .runTransaction (
330
332
"Apply remote event" ,
331
333
() -> {
334
+ Map <Integer , TargetChange > targetChanges = remoteEvent .getTargetChanges ();
332
335
long sequenceNumber = persistence .getReferenceDelegate ().getCurrentSequenceNumber ();
333
- Set <DocumentKey > authoritativeUpdates = new HashSet <>();
334
336
335
- Map <Integer , TargetChange > targetChanges = remoteEvent .getTargetChanges ();
336
337
for (Map .Entry <Integer , TargetChange > entry : targetChanges .entrySet ()) {
337
338
Integer boxedTargetId = entry .getKey ();
338
339
int targetId = boxedTargetId ;
339
340
TargetChange change = entry .getValue ();
340
341
341
- // Do not ref/unref unassigned targetIds - it may lead to leaks.
342
- QueryData queryData = targetIds .get (targetId );
343
- if (queryData == null ) {
342
+ QueryData oldQueryData = targetIds .get (targetId );
343
+ if (oldQueryData == null ) {
344
+ // We don't update the remote keys if the query is not active. This ensures that
345
+ // we persist the updated query data along with the updated assignment.
344
346
continue ;
345
347
}
346
348
347
- // When a global snapshot contains updates (either add or modify) we can completely
348
- // trust these updates as authoritative and blindly apply them to our cache (as a
349
- // defensive measure to promote self-healing in the unfortunate case that our cache
350
- // is ever somehow corrupted / out-of-sync).
351
- //
352
- // If the document is only updated while removing it from a target then watch isn't
353
- // obligated to send the absolute latest version: it can send the first version that
354
- // caused the document not to match.
355
- for (DocumentKey key : change .getAddedDocuments ()) {
356
- authoritativeUpdates .add (key );
357
- }
358
- for (DocumentKey key : change .getModifiedDocuments ()) {
359
- authoritativeUpdates .add (key );
360
- }
361
-
362
349
queryCache .removeMatchingKeys (change .getRemovedDocuments (), targetId );
363
350
queryCache .addMatchingKeys (change .getAddedDocuments (), targetId );
364
351
365
- // Update the resume token if the change includes one. Don't clear any preexisting
366
- // value.
367
352
ByteString resumeToken = change .getResumeToken ();
353
+ // Update the resume token if the change includes one.
368
354
if (!resumeToken .isEmpty ()) {
369
- QueryData oldQueryData = queryData ;
370
- queryData =
371
- queryData .copy (remoteEvent .getSnapshotVersion (), resumeToken , sequenceNumber );
372
- targetIds .put (boxedTargetId , queryData );
373
-
374
- if (shouldPersistQueryData (oldQueryData , queryData , change )) {
375
- queryCache .updateQueryData (queryData );
355
+ QueryData newQueryData =
356
+ oldQueryData .copy (remoteVersion , resumeToken , sequenceNumber );
357
+ targetIds .put (boxedTargetId , newQueryData );
358
+
359
+ // Update the query data if there are target changes (or if sufficient time has
360
+ // passed since the last update).
361
+ if (shouldPersistQueryData (oldQueryData , newQueryData , change )) {
362
+ queryCache .updateQueryData (newQueryData );
376
363
}
377
364
}
378
365
}
@@ -391,10 +378,9 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEve
391
378
MaybeDocument existingDoc = existingDocs .get (key );
392
379
393
380
if (existingDoc == null
394
- || (authoritativeUpdates .contains (doc .getKey ()) && !existingDoc .hasPendingWrites ())
395
- || doc .getVersion ().compareTo (existingDoc .getVersion ()) >= 0 ) {
396
- // If a document update isn't authoritative, make sure we don't apply an old document
397
- // version to the remote cache.
381
+ || doc .getVersion ().compareTo (existingDoc .getVersion ()) > 0
382
+ || (doc .getVersion ().compareTo (existingDoc .getVersion ()) == 0
383
+ && existingDoc .hasPendingWrites ())) {
398
384
remoteDocuments .add (doc );
399
385
changedDocs .put (key , doc );
400
386
} else if (doc instanceof NoDocument && doc .getVersion ().equals (SnapshotVersion .NONE )) {
@@ -422,7 +408,6 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEve
422
408
// remote events when we get permission denied errors while trying to resolve the
423
409
// state of a locally cached document that is in limbo.
424
410
SnapshotVersion lastRemoteVersion = queryCache .getLastRemoteSnapshotVersion ();
425
- SnapshotVersion remoteVersion = remoteEvent .getSnapshotVersion ();
426
411
if (!remoteVersion .equals (SnapshotVersion .NONE )) {
427
412
hardAssert (
428
413
remoteVersion .compareTo (lastRemoteVersion ) >= 0 ,
@@ -448,10 +433,11 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEve
448
433
*/
449
434
private static boolean shouldPersistQueryData (
450
435
QueryData oldQueryData , QueryData newQueryData , TargetChange change ) {
451
- // Avoid clearing any existing value
452
- if (newQueryData .getResumeToken ().isEmpty ()) return false ;
436
+ hardAssert (
437
+ !newQueryData .getResumeToken ().isEmpty (),
438
+ "Attempted to persist query data with empty resume token" );
453
439
454
- // Any resume token is interesting if there isn 't one already.
440
+ // Always persist query data if we don 't already have a resume token .
455
441
if (oldQueryData .getResumeToken ().isEmpty ()) return true ;
456
442
457
443
// Don't allow resume token changes to be buffered indefinitely. This allows us to be reasonably
0 commit comments