24
24
import com .google .firebase .database .collection .ImmutableSortedMap ;
25
25
import com .google .firebase .database .collection .ImmutableSortedSet ;
26
26
import com .google .firebase .firestore .auth .User ;
27
+ import com .google .firebase .firestore .bundle .BundleMetadata ;
28
+ import com .google .firebase .firestore .bundle .NamedQuery ;
27
29
import com .google .firebase .firestore .core .Query ;
28
30
import com .google .firebase .firestore .core .Target ;
29
31
import com .google .firebase .firestore .core .TargetIdGenerator ;
32
34
import com .google .firebase .firestore .model .MaybeDocument ;
33
35
import com .google .firebase .firestore .model .NoDocument ;
34
36
import com .google .firebase .firestore .model .ObjectValue ;
37
+ import com .google .firebase .firestore .model .ResourcePath ;
35
38
import com .google .firebase .firestore .model .SnapshotVersion ;
36
39
import com .google .firebase .firestore .model .mutation .Mutation ;
37
40
import com .google .firebase .firestore .model .mutation .MutationBatch ;
@@ -121,6 +124,9 @@ public final class LocalStore {
121
124
/** Maps a query to the data about that query. */
122
125
private final TargetCache targetCache ;
123
126
127
+ /** Holds information about the bundles loaded into the SDK. */
128
+ private final BundleCache bundleCache ;
129
+
124
130
/** Maps a targetId to data about its query. */
125
131
private final SparseArray <TargetData > queryDataByTarget ;
126
132
@@ -135,6 +141,7 @@ public LocalStore(Persistence persistence, QueryEngine queryEngine, User initial
135
141
persistence .isStarted (), "LocalStore was passed an unstarted persistence implementation" );
136
142
this .persistence = persistence ;
137
143
targetCache = persistence .getTargetCache ();
144
+ bundleCache = persistence .getBundleCache ();
138
145
targetIdGenerator = TargetIdGenerator .forTargetCache (targetCache .getHighestTargetId ());
139
146
mutationQueue = persistence .getMutationQueue (initialUser );
140
147
remoteDocuments = persistence .getRemoteDocumentCache ();
@@ -372,51 +379,18 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEve
372
379
}
373
380
}
374
381
375
- Map <DocumentKey , MaybeDocument > changedDocs = new HashMap <>();
376
382
Map <DocumentKey , MaybeDocument > documentUpdates = remoteEvent .getDocumentUpdates ();
377
383
Set <DocumentKey > limboDocuments = remoteEvent .getResolvedLimboDocuments ();
378
- // Each loop iteration only affects its "own" doc, so it's safe to get all the remote
379
- // documents in advance in a single call.
380
- Map <DocumentKey , MaybeDocument > existingDocs =
381
- remoteDocuments .getAll (documentUpdates .keySet ());
382
-
383
- for (Entry <DocumentKey , MaybeDocument > entry : documentUpdates .entrySet ()) {
384
- DocumentKey key = entry .getKey ();
385
- MaybeDocument doc = entry .getValue ();
386
- MaybeDocument existingDoc = existingDocs .get (key );
387
-
388
- // Note: The order of the steps below is important, since we want to ensure that
389
- // rejected limbo resolutions (which fabricate NoDocuments with SnapshotVersion.NONE)
390
- // never add documents to cache.
391
- if (doc instanceof NoDocument && doc .getVersion ().equals (SnapshotVersion .NONE )) {
392
- // NoDocuments with SnapshotVersion.NONE are used in manufactured events. We remove
393
- // these documents from cache since we lost access.
394
- remoteDocuments .remove (doc .getKey ());
395
- changedDocs .put (key , doc );
396
- } else if (existingDoc == null
397
- || doc .getVersion ().compareTo (existingDoc .getVersion ()) > 0
398
- || (doc .getVersion ().compareTo (existingDoc .getVersion ()) == 0
399
- && existingDoc .hasPendingWrites ())) {
400
- hardAssert (
401
- !SnapshotVersion .NONE .equals (remoteEvent .getSnapshotVersion ()),
402
- "Cannot add a document when the remote version is zero" );
403
- remoteDocuments .add (doc , remoteEvent .getSnapshotVersion ());
404
- changedDocs .put (key , doc );
405
- } else {
406
- Logger .debug (
407
- "LocalStore" ,
408
- "Ignoring outdated watch update for %s."
409
- + "Current version: %s Watch version: %s" ,
410
- key ,
411
- existingDoc .getVersion (),
412
- doc .getVersion ());
413
- }
414
384
385
+ for (DocumentKey key : documentUpdates .keySet ()) {
415
386
if (limboDocuments .contains (key )) {
416
387
persistence .getReferenceDelegate ().updateLimboDocument (key );
417
388
}
418
389
}
419
390
391
+ Map <DocumentKey , MaybeDocument > changedDocs =
392
+ populateDocumentChanges (documentUpdates , null , remoteEvent .getSnapshotVersion ());
393
+
420
394
// HACK: The only reason we allow snapshot version NONE is so that we can synthesize
421
395
// remote events when we get permission denied errors while trying to resolve the
422
396
// state of a locally cached document that is in limbo.
@@ -434,6 +408,65 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEve
434
408
});
435
409
}
436
410
411
+ /**
412
+ * Populates the remote document cache with documents from backend or a bundle. Returns the
413
+ * document changes resulting from applying those documents.
414
+ *
415
+ * <p>Note: this function will use `documentVersions` if it is defined. When it is not defined, it
416
+ * resorts to `globalVersion`.
417
+ *
418
+ * @param documents Documents to be applied.
419
+ * @param documentVersions A DocumentKey-to-SnapshotVersion map if documents have their own read
420
+ * time.
421
+ * @param globalVersion A SnapshotVersion representing the read time if all documents have the
422
+ * same read time.
423
+ */
424
+ private Map <DocumentKey , MaybeDocument > populateDocumentChanges (
425
+ Map <DocumentKey , MaybeDocument > documents ,
426
+ @ Nullable Map <DocumentKey , SnapshotVersion > documentVersions ,
427
+ SnapshotVersion globalVersion ) {
428
+ Map <DocumentKey , MaybeDocument > changedDocs = new HashMap <>();
429
+
430
+ // Each loop iteration only affects its "own" doc, so it's safe to get all the remote
431
+ // documents in advance in a single call.
432
+ Map <DocumentKey , MaybeDocument > existingDocs = remoteDocuments .getAll (documents .keySet ());
433
+
434
+ for (Entry <DocumentKey , MaybeDocument > entry : documents .entrySet ()) {
435
+ DocumentKey key = entry .getKey ();
436
+ MaybeDocument doc = entry .getValue ();
437
+ MaybeDocument existingDoc = existingDocs .get (key );
438
+ SnapshotVersion readTime =
439
+ documentVersions != null ? documentVersions .get (key ) : globalVersion ;
440
+
441
+ // Note: The order of the steps below is important, since we want to ensure that
442
+ // rejected limbo resolutions (which fabricate NoDocuments with SnapshotVersion.NONE)
443
+ // never add documents to cache.
444
+ if (doc instanceof NoDocument && doc .getVersion ().equals (SnapshotVersion .NONE )) {
445
+ // NoDocuments with SnapshotVersion.NONE are used in manufactured events. We remove
446
+ // these documents from cache since we lost access.
447
+ remoteDocuments .remove (doc .getKey ());
448
+ changedDocs .put (key , doc );
449
+ } else if (existingDoc == null
450
+ || doc .getVersion ().compareTo (existingDoc .getVersion ()) > 0
451
+ || (doc .getVersion ().compareTo (existingDoc .getVersion ()) == 0
452
+ && existingDoc .hasPendingWrites ())) {
453
+ hardAssert (
454
+ !SnapshotVersion .NONE .equals (readTime ),
455
+ "Cannot add a document when the remote version is zero" );
456
+ remoteDocuments .add (doc , readTime );
457
+ changedDocs .put (key , doc );
458
+ } else {
459
+ Logger .debug (
460
+ "LocalStore" ,
461
+ "Ignoring outdated watch update for %s." + "Current version: %s Watch version: %s" ,
462
+ key ,
463
+ existingDoc .getVersion (),
464
+ doc .getVersion ());
465
+ }
466
+ }
467
+ return changedDocs ;
468
+ }
469
+
437
470
/**
438
471
* Returns true if the newTargetData should be persisted during an update of an active target.
439
472
* TargetData should always be persisted when a target is being released and should not call this
@@ -575,6 +608,102 @@ TargetData getTargetData(Target target) {
575
608
return targetCache .getTargetData (target );
576
609
}
577
610
611
+ /**
612
+ * Returns a boolean indicating if the given bundle has already been loaded and its create time is
613
+ * newer than the currently loading bundle.
614
+ */
615
+ public boolean hasNewerBundle (BundleMetadata bundleMetadata ) {
616
+ return persistence .runTransaction (
617
+ "Has newer bundle" ,
618
+ () -> {
619
+ BundleMetadata existingMetadata =
620
+ bundleCache .getBundleMetadata (bundleMetadata .getBundleId ());
621
+ return existingMetadata != null
622
+ && existingMetadata .getCreateTime ().compareTo (bundleMetadata .getCreateTime ()) > 0 ;
623
+ });
624
+ }
625
+
626
+ /** Saves the given BundleMetadata to local persistence. */
627
+ public void saveBundle (BundleMetadata bundleMetadata ) {
628
+ persistence .runTransaction (
629
+ "Save bundle" ,
630
+ () -> {
631
+ bundleCache .saveBundleMetadata (bundleMetadata );
632
+ });
633
+ }
634
+
635
+ /**
636
+ * Applies the documents from a bundle to the "ground-state" (remote) documents.
637
+ *
638
+ * <p>LocalDocuments are re-calculated if there are remaining mutations in the queue.
639
+ */
640
+ public ImmutableSortedMap <DocumentKey , MaybeDocument > applyBundledDocuments (
641
+ ImmutableSortedMap <DocumentKey , MaybeDocument > documents , String bundleId ) {
642
+ // Allocates a target to hold all document keys from the bundle, such that
643
+ // they will not get garbage collected right away.
644
+ TargetData umbrellaTargetData = allocateTarget (newUmbrellaTarget (bundleId ));
645
+
646
+ return persistence .runTransaction (
647
+ "Apply bundle documents" ,
648
+ () -> {
649
+ ImmutableSortedSet <DocumentKey > documentKeys = DocumentKey .emptyKeySet ();
650
+ Map <DocumentKey , MaybeDocument > documentMap = new HashMap <>();
651
+ Map <DocumentKey , SnapshotVersion > versionMap = new HashMap <>();
652
+
653
+ for (Entry <DocumentKey , MaybeDocument > entry : documents ) {
654
+ DocumentKey documentKey = entry .getKey ();
655
+ MaybeDocument document = entry .getValue ();
656
+
657
+ if (document instanceof Document ) {
658
+ documentKeys = documentKeys .insert (documentKey );
659
+ }
660
+ documentMap .put (documentKey , document );
661
+ versionMap .put (documentKey , document .getVersion ());
662
+ }
663
+
664
+ targetCache .removeMatchingKeysForTargetId (umbrellaTargetData .getTargetId ());
665
+ targetCache .addMatchingKeys (documentKeys , umbrellaTargetData .getTargetId ());
666
+
667
+ Map <DocumentKey , MaybeDocument > changedDocs =
668
+ populateDocumentChanges (documentMap , versionMap , SnapshotVersion .NONE );
669
+ return localDocuments .getLocalViewOfDocuments (changedDocs );
670
+ });
671
+ }
672
+
673
+ /** Saves the given NamedQuery to local persistence. */
674
+ public void saveNamedQuery (NamedQuery namedQuery , ImmutableSortedSet <DocumentKey > documentKeys ) {
675
+ // Allocate a target for the named query such that it can be resumed from associated read time
676
+ // if users use it to listen.
677
+ // NOTE: this also means if no corresponding target exists, the new target will remain active
678
+ // and will not get collected, unless users happen to unlisten the query somehow.
679
+ TargetData existingTargetData = allocateTarget (namedQuery .getBundledQuery ().getTarget ());
680
+ int targetId = existingTargetData .getTargetId ();
681
+
682
+ persistence .runTransaction (
683
+ "Saved named query" ,
684
+ () -> {
685
+ // Only update the matching documents if it is newer than what the SDK already has
686
+ if (namedQuery .getReadTime ().compareTo (existingTargetData .getSnapshotVersion ()) > 0 ) {
687
+ // Update existing target data because the query from the bundle is newer.
688
+ TargetData newTargetData =
689
+ existingTargetData .withResumeToken (ByteString .EMPTY , namedQuery .getReadTime ());
690
+ queryDataByTarget .append (targetId , newTargetData );
691
+
692
+ targetCache .updateTargetData (newTargetData );
693
+ targetCache .removeMatchingKeysForTargetId (targetId );
694
+ targetCache .addMatchingKeys (documentKeys , targetId );
695
+ }
696
+
697
+ bundleCache .saveNamedQuery (namedQuery );
698
+ });
699
+ }
700
+
701
+ /** Returns the NameQuery associated with queryName or null if not found. */
702
+ public @ Nullable NamedQuery getNamedQuery (String queryName ) {
703
+ return persistence .runTransaction (
704
+ "Get named query" , () -> bundleCache .getNamedQuery (queryName ));
705
+ }
706
+
578
707
/** Mutable state for the transaction in allocateQuery. */
579
708
private static class AllocateQueryHolder {
580
709
TargetData cached ;
@@ -673,4 +802,15 @@ public LruGarbageCollector.Results collectGarbage(LruGarbageCollector garbageCol
673
802
return persistence .runTransaction (
674
803
"Collect garbage" , () -> garbageCollector .collect (queryDataByTarget ));
675
804
}
805
+
806
+ /**
807
+ * Creates a new target using the given bundle name, which will be used to hold the keys of all
808
+ * documents from the bundle in query-document mappings. This ensures that the loaded documents do
809
+ * not get garbage collected right away.
810
+ */
811
+ private static Target newUmbrellaTarget (String bundleName ) {
812
+ // It is OK that the path used for the query is not valid, because this will not be read and
813
+ // queried.
814
+ return Query .atPath (ResourcePath .fromString ("__bundle__/docs/" + bundleName )).toTarget ();
815
+ }
676
816
}
0 commit comments