@@ -449,37 +449,23 @@ export class LocalStore {
449
449
*/
450
450
applyRemoteEvent ( remoteEvent : RemoteEvent ) : Promise < MaybeDocumentMap > {
451
451
const documentBuffer = this . remoteDocuments . newChangeBuffer ( ) ;
452
+ const snapshotVersion = remoteEvent . snapshotVersion ;
452
453
return this . persistence . runTransaction (
453
454
'Apply remote event' ,
454
455
'readwrite-primary' ,
455
456
txn => {
456
457
const promises = [ ] as Array < PersistencePromise < void > > ;
457
- let authoritativeUpdates = documentKeySet ( ) ;
458
458
objUtils . forEachNumber (
459
459
remoteEvent . targetChanges ,
460
460
( targetId : TargetId , change : TargetChange ) => {
461
- // Do not ref/unref unassigned targetIds - it may lead to leaks.
462
- let queryData = this . queryDataByTarget [ targetId ] ;
463
- if ( ! queryData ) {
461
+ const oldQueryData = this . queryDataByTarget [ targetId ] ;
462
+ if ( ! oldQueryData ) {
464
463
return ;
465
464
}
466
465
467
- // When a global snapshot contains updates (either add or modify) we
468
- // can completely trust these updates as authoritative and blindly
469
- // apply them to our cache (as a defensive measure to promote
470
- // self-healing in the unfortunate case that our cache is ever somehow
471
- // corrupted / out-of-sync).
472
- //
473
- // If the document is only updated while removing it from a target
474
- // then watch isn't obligated to send the absolute latest version: it
475
- // can send the first version that caused the document not to match.
476
- change . addedDocuments . forEach ( key => {
477
- authoritativeUpdates = authoritativeUpdates . add ( key ) ;
478
- } ) ;
479
- change . modifiedDocuments . forEach ( key => {
480
- authoritativeUpdates = authoritativeUpdates . add ( key ) ;
481
- } ) ;
482
-
466
+ // Only update the remote keys if the query is still active. This
467
+ // ensures that we can persist the updated query data along with
468
+ // the updated assignment.
483
469
promises . push (
484
470
this . queryCache
485
471
. removeMatchingKeys ( txn , change . removedDocuments , targetId )
@@ -492,25 +478,27 @@ export class LocalStore {
492
478
} )
493
479
) ;
494
480
495
- // Update the resume token if the change includes one. Don't clear
496
- // any preexisting value.
497
481
const resumeToken = change . resumeToken ;
482
+ // Update the resume token if the change includes one.
498
483
if ( resumeToken . length > 0 ) {
499
- const oldQueryData = queryData ;
500
- queryData = queryData . copy ( {
484
+ const newQueryData = oldQueryData . copy ( {
501
485
resumeToken,
502
- snapshotVersion : remoteEvent . snapshotVersion
486
+ snapshotVersion
503
487
} ) ;
504
- this . queryDataByTarget [ targetId ] = queryData ;
488
+ this . queryDataByTarget [ targetId ] = newQueryData ;
505
489
490
+ // Update the query data if there are target changes (or if
491
+ // sufficient time has passed since the last update).
506
492
if (
507
493
LocalStore . shouldPersistQueryData (
508
494
oldQueryData ,
509
- queryData ,
495
+ newQueryData ,
510
496
change
511
497
)
512
498
) {
513
- promises . push ( this . queryCache . updateQueryData ( txn , queryData ) ) ;
499
+ promises . push (
500
+ this . queryCache . updateQueryData ( txn , newQueryData )
501
+ ) ;
514
502
}
515
503
}
516
504
}
@@ -528,19 +516,12 @@ export class LocalStore {
528
516
documentBuffer . getEntries ( txn , updatedKeys ) . next ( existingDocs => {
529
517
remoteEvent . documentUpdates . forEach ( ( key , doc ) => {
530
518
const existingDoc = existingDocs . get ( key ) ;
531
- // If a document update isn't authoritative, make sure we don't
532
- // apply an old document version to the remote cache. We make an
533
- // exception for SnapshotVersion.MIN which can happen for
534
- // manufactured events (e.g. in the case of a limbo document
535
- // resolution failing).
536
519
if (
537
520
existingDoc == null ||
538
- ( authoritativeUpdates . has ( doc . key ) &&
539
- ! existingDoc . hasPendingWrites ) ||
540
- doc . version . compareTo ( existingDoc . version ) >= 0
521
+ doc . version . compareTo ( existingDoc . version ) > 0 ||
522
+ ( doc . version . compareTo ( existingDoc . version ) === 0 &&
523
+ existingDoc . hasPendingWrites )
541
524
) {
542
- // If a document update isn't authoritative, make sure we don't apply an old document
543
- // version to the remote cache.
544
525
documentBuffer . addEntry ( doc ) ;
545
526
changedDocs = changedDocs . insert ( key , doc ) ;
546
527
} else if (
@@ -580,22 +561,21 @@ export class LocalStore {
580
561
// can synthesize remote events when we get permission denied errors while
581
562
// trying to resolve the state of a locally cached document that is in
582
563
// limbo.
583
- const remoteVersion = remoteEvent . snapshotVersion ;
584
- if ( ! remoteVersion . isEqual ( SnapshotVersion . MIN ) ) {
564
+ if ( ! snapshotVersion . isEqual ( SnapshotVersion . MIN ) ) {
585
565
const updateRemoteVersion = this . queryCache
586
566
. getLastRemoteSnapshotVersion ( txn )
587
- . next ( lastRemoteVersion => {
567
+ . next ( lastRemoteSnapshotVersion => {
588
568
assert (
589
- remoteVersion . compareTo ( lastRemoteVersion ) >= 0 ,
569
+ snapshotVersion . compareTo ( lastRemoteSnapshotVersion ) >= 0 ,
590
570
'Watch stream reverted to previous snapshot?? ' +
591
- remoteVersion +
571
+ snapshotVersion +
592
572
' < ' +
593
- lastRemoteVersion
573
+ lastRemoteSnapshotVersion
594
574
) ;
595
575
return this . queryCache . setTargetsMetadata (
596
576
txn ,
597
577
txn . currentSequenceNumber ,
598
- remoteVersion
578
+ snapshotVersion
599
579
) ;
600
580
} ) ;
601
581
promises . push ( updateRemoteVersion ) ;
@@ -629,12 +609,12 @@ export class LocalStore {
629
609
newQueryData : QueryData ,
630
610
change : TargetChange
631
611
) : boolean {
632
- // Avoid clearing any existing value
633
- if ( newQueryData . resumeToken . length === 0 ) {
634
- return false ;
635
- }
612
+ assert (
613
+ newQueryData . resumeToken . length > 0 ,
614
+ 'Attempted to persist query data with no resume token'
615
+ ) ;
636
616
637
- // Any resume token is interesting if there isn 't one already.
617
+ // Always persist query data if we don 't already have a resume token .
638
618
if ( oldQueryData . resumeToken . length === 0 ) {
639
619
return true ;
640
620
}
0 commit comments