Skip to content

Commit d8d7293

Browse files
committed
Implement limbo resolution throttling.
1 parent 2c8c940 commit d8d7293

File tree

4 files changed

+391
-23
lines changed

4 files changed

+391
-23
lines changed

packages/firestore/src/core/sync_engine.ts

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ import { AsyncQueue } from '../util/async_queue';
7373
import { TransactionRunner } from './transaction_runner';
7474

7575
const LOG_TAG = 'SyncEngine';
76+
const DEFAULT_MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100;
7677

7778
/**
7879
* QueryView contains all of the data that SyncEngine needs to keep track of for
@@ -154,6 +155,9 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
154155
private limboResolutionsByTarget: {
155156
[targetId: number]: LimboResolution;
156157
} = {};
158+
private readonly maxConcurrentLimboResolutions: number = DEFAULT_MAX_CONCURRENT_LIMBO_RESOLUTIONS;
159+
/** The keys of documents whose limbo resolutions are enqueued. */
160+
private limboListenQueue: DocumentKey[] = [];
157161
private limboDocumentRefs = new ReferenceSet();
158162
/** Stores user completion handlers, indexed by User and BatchId. */
159163
private mutationUserCallbacks = {} as {
@@ -174,8 +178,13 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
174178
private remoteStore: RemoteStore,
175179
// PORTING NOTE: Manages state synchronization in multi-tab environments.
176180
private sharedClientState: SharedClientState,
177-
private currentUser: User
178-
) {}
181+
private currentUser: User,
182+
maxConcurrentLimboResolutions?: number
183+
) {
184+
if (maxConcurrentLimboResolutions) {
185+
this.maxConcurrentLimboResolutions = maxConcurrentLimboResolutions;
186+
}
187+
}
179188

180189
// Only used for testing.
181190
get isPrimaryClient(): boolean {
@@ -486,6 +495,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
486495
// So go ahead and remove it from bookkeeping.
487496
this.limboTargetsByKey = this.limboTargetsByKey.remove(limboKey);
488497
delete this.limboResolutionsByTarget[targetId];
498+
this.startEnqueuedLimboResolutions();
489499

490500
// TODO(klimt): We really only should do the following on permission
491501
// denied errors, but we don't have the cause code here.
@@ -740,6 +750,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
740750
this.remoteStore.unlisten(limboTargetId);
741751
this.limboTargetsByKey = this.limboTargetsByKey.remove(key);
742752
delete this.limboResolutionsByTarget[limboTargetId];
753+
this.startEnqueuedLimboResolutions();
743754
}
744755

745756
private updateTrackedLimbos(
@@ -770,29 +781,44 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
770781
const key = limboChange.key;
771782
if (!this.limboTargetsByKey.get(key)) {
772783
log.debug(LOG_TAG, 'New document in limbo: ' + key);
784+
this.limboListenQueue.push(key);
785+
this.startEnqueuedLimboResolutions();
786+
}
787+
}
788+
789+
private startEnqueuedLimboResolutions(): void {
790+
while (
791+
this.limboListenQueue.length > 0 &&
792+
this.limboTargetsByKey.size < this.maxConcurrentLimboResolutions
793+
) {
794+
const key = this.limboListenQueue.shift()!;
773795
const limboTargetId = this.limboTargetIdGenerator.next();
774-
const query = Query.atPath(key.path);
775796
this.limboResolutionsByTarget[limboTargetId] = new LimboResolution(key);
797+
this.limboTargetsByKey = this.limboTargetsByKey.insert(
798+
key,
799+
limboTargetId
800+
);
776801
this.remoteStore.listen(
777802
new TargetData(
778-
query.toTarget(),
803+
Query.atPath(key.path).toTarget(),
779804
limboTargetId,
780805
TargetPurpose.LimboResolution,
781806
ListenSequence.INVALID
782807
)
783808
);
784-
this.limboTargetsByKey = this.limboTargetsByKey.insert(
785-
key,
786-
limboTargetId
787-
);
788809
}
789810
}
790811

791812
// Visible for testing
792-
currentLimboDocs(): SortedMap<DocumentKey, TargetId> {
813+
activeLimboDocumentResolutions(): SortedMap<DocumentKey, TargetId> {
793814
return this.limboTargetsByKey;
794815
}
795816

817+
// Visible for testing
818+
documentsEnqueuedForLimboResolution(): DocumentKey[] {
819+
return this.limboListenQueue;
820+
}
821+
796822
private async emitNewSnapsAndNotifyLocalStore(
797823
changes: MaybeDocumentMap,
798824
remoteEvent?: RemoteEvent

packages/firestore/test/unit/specs/limbo_spec.test.ts

Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -639,4 +639,267 @@ describeSpec('Limbo Documents:', [], () => {
639639
);
640640
}
641641
);
642+
643+
specTest(
644+
'Limbo resolution throttling with all results at once from watch',
645+
['no-ios'],
646+
() => {
647+
const query = Query.atPath(path('collection'));
648+
const doc1 = doc('collection/a', 1000, { key: 'a' });
649+
const doc2 = doc('collection/b', 1000, { key: 'b' });
650+
const doc3 = doc('collection/c', 1000, { key: 'c' });
651+
const limboQuery1 = Query.atPath(doc1.key.path);
652+
const limboQuery2 = Query.atPath(doc2.key.path);
653+
const limboQuery3 = Query.atPath(doc3.key.path);
654+
655+
return (
656+
spec()
657+
.withMaxConcurrentLimboResolutions(2)
658+
.userListens(query)
659+
.watchAcksFull(query, 1000, doc1, doc2, doc3)
660+
.expectEvents(query, {
661+
added: [doc1, doc2, doc3]
662+
})
663+
.watchResets(query)
664+
.watchSends({ affects: [query] })
665+
.watchCurrents(query, 'resume-token-2000')
666+
.watchSnapshots(2000)
667+
.expectLimboDocsEx({
668+
activeKeys: [doc1.key, doc2.key],
669+
inactiveKeys: [doc3.key]
670+
})
671+
// Limbo document causes query to be "inconsistent"
672+
.expectEvents(query, { fromCache: true })
673+
.watchAcks(limboQuery1)
674+
.watchAcks(limboQuery2)
675+
.watchCurrents(limboQuery1, 'resume-token-2001')
676+
.watchCurrents(limboQuery2, 'resume-token-2001')
677+
.watchSnapshots(2001)
678+
.expectLimboDocs(doc3.key)
679+
.expectEvents(query, {
680+
removed: [doc1, doc2],
681+
fromCache: true
682+
})
683+
.watchAcks(limboQuery3)
684+
.watchCurrents(limboQuery3, 'resume-token-2001')
685+
.watchSnapshots(2001)
686+
.expectLimboDocs()
687+
.expectEvents(query, {
688+
removed: [doc3],
689+
fromCache: false
690+
})
691+
);
692+
}
693+
);
694+
695+
specTest(
696+
'Limbo resolution throttling with results one at a time from watch',
697+
['no-ios'],
698+
() => {
699+
const query = Query.atPath(path('collection'));
700+
const doc1 = doc('collection/a', 1000, { key: 'a' });
701+
const doc2 = doc('collection/b', 1000, { key: 'b' });
702+
const doc3 = doc('collection/c', 1000, { key: 'c' });
703+
const limboQuery1 = Query.atPath(doc1.key.path);
704+
const limboQuery2 = Query.atPath(doc2.key.path);
705+
const limboQuery3 = Query.atPath(doc3.key.path);
706+
707+
return (
708+
spec()
709+
.withMaxConcurrentLimboResolutions(2)
710+
.userListens(query)
711+
.watchAcksFull(query, 1000, doc1, doc2, doc3)
712+
.expectEvents(query, {
713+
added: [doc1, doc2, doc3]
714+
})
715+
.watchResets(query)
716+
.watchSends({ affects: [query] })
717+
.watchCurrents(query, 'resume-token-2000')
718+
.watchSnapshots(2000)
719+
.expectLimboDocsEx({
720+
activeKeys: [doc1.key, doc2.key],
721+
inactiveKeys: [doc3.key]
722+
})
723+
// Limbo document causes query to be "inconsistent"
724+
.expectEvents(query, { fromCache: true })
725+
.watchAcks(limboQuery1)
726+
.watchCurrents(limboQuery1, 'resume-token-2001')
727+
.watchSnapshots(2001)
728+
.expectLimboDocs(doc2.key, doc3.key)
729+
.expectEvents(query, {
730+
removed: [doc1],
731+
fromCache: true
732+
})
733+
.watchAcks(limboQuery2)
734+
.watchCurrents(limboQuery2, 'resume-token-2001')
735+
.watchSnapshots(2001)
736+
.expectLimboDocs(doc3.key)
737+
.expectEvents(query, {
738+
removed: [doc2],
739+
fromCache: true
740+
})
741+
.watchAcks(limboQuery3)
742+
.watchCurrents(limboQuery3, 'resume-token-2001')
743+
.watchSnapshots(2001)
744+
.expectLimboDocs()
745+
.expectEvents(query, {
746+
removed: [doc3],
747+
fromCache: false
748+
})
749+
);
750+
}
751+
);
752+
753+
specTest(
754+
'Limbo resolution throttling when a limbo listen is rejected.',
755+
['no-ios'],
756+
() => {
757+
const query = Query.atPath(path('collection'));
758+
const doc1 = doc('collection/a', 1000, { key: 'a' });
759+
const doc2 = doc('collection/b', 1000, { key: 'b' });
760+
const limboQuery1 = Query.atPath(doc1.key.path);
761+
const limboQuery2 = Query.atPath(doc2.key.path);
762+
763+
return (
764+
spec()
765+
.withMaxConcurrentLimboResolutions(1)
766+
.userListens(query)
767+
.watchAcksFull(query, 1000, doc1, doc2)
768+
.expectEvents(query, { added: [doc1, doc2] })
769+
// Watch tells us that the query results have changed to the empty
770+
// set, which makes our local cache inconsistent with the remote
771+
// state, causing a fromCache=true event to be raised.
772+
.watchResets(query)
773+
.watchSends({ affects: [query] })
774+
.watchCurrents(query, 'resume-token-1001')
775+
.watchSnapshots(1001)
776+
// Both doc1 and doc2 are in limbo, but the maximum number of limbo
777+
// listens was set to 1, which causes doc1 to get resolved and doc2
778+
// to get enqueued.
779+
.expectLimboDocsEx({
780+
activeKeys: [doc1.key],
781+
inactiveKeys: [doc2.key]
782+
})
783+
// Limbo document causes query to be "inconsistent"
784+
.expectEvents(query, { fromCache: true })
785+
.watchRemoves(
786+
limboQuery1,
787+
new RpcError(Code.RESOURCE_EXHAUSTED, 'Resource exhausted')
788+
)
789+
// When a limbo listen gets rejected, we assume that it was deleted.
790+
// But now that doc1 is resolved, the limbo resolution for doc2 can
791+
// start.
792+
.expectLimboDocs(doc2.key)
793+
.expectEvents(query, { removed: [doc1], fromCache: true })
794+
// Reject the listen for the second limbo resolution as well, in order
795+
// to exercise the code path of a rejected limbo resolution without
796+
// any enqueued limbo resolutions.
797+
.watchRemoves(
798+
limboQuery2,
799+
new RpcError(Code.RESOURCE_EXHAUSTED, 'Resource exhausted')
800+
)
801+
.expectLimboDocs()
802+
.expectEvents(query, { removed: [doc2] })
803+
);
804+
}
805+
);
806+
807+
specTest(
808+
// This test exercises the steps that resulted in unbounded reads that
809+
// motivated throttling:
810+
// https://github.com/firebase/firebase-js-sdk/issues/2683
811+
'Limbo resolution throttling with existence filter mismatch',
812+
['no-ios'],
813+
() => {
814+
const query = Query.atPath(path('collection'));
815+
const docA1 = doc('collection/a1', 1000, { key: 'a1' });
816+
const docA2 = doc('collection/a2', 1000, { key: 'a2' });
817+
const docA3 = doc('collection/a3', 1000, { key: 'a3' });
818+
const docB1 = doc('collection/b1', 1000, { key: 'b1' });
819+
const docB2 = doc('collection/b2', 1000, { key: 'b2' });
820+
const docB3 = doc('collection/b3', 1000, { key: 'b3' });
821+
const docA1Query = Query.atPath(docA1.key.path);
822+
const docA2Query = Query.atPath(docA2.key.path);
823+
const docA3Query = Query.atPath(docA3.key.path);
824+
825+
return (
826+
spec()
827+
.withMaxConcurrentLimboResolutions(2)
828+
.userListens(query)
829+
.watchAcks(query)
830+
.watchSends({ affects: [query] }, docA1, docA2, docA3)
831+
.watchCurrents(query, 'resume-token-1000')
832+
.watchSnapshots(1000)
833+
.expectEvents(query, { added: [docA1, docA2, docA3] })
834+
// At this point the query is consistent and matches 3 documents:
835+
// docA1, docA2, and docA3. Then, network connectivity is lost.
836+
.disableNetwork()
837+
// The query listener is notified that the results are being served
838+
// from cache since without network connection there is no way to know
839+
// if we are in sync with the server.
840+
.expectEvents(query, { fromCache: true })
841+
.enableNetwork()
842+
// The network connection has come back so the client re-registers
843+
// the listener, providing the resume token from before. Watch will
844+
// then send updates that occurred since the timestamp encoded in the
845+
// resume token.
846+
.restoreListen(query, 'resume-token-1000')
847+
.watchAcks(query)
848+
// Watch now tells us that the query results on the server are docB1,
849+
// docB2, and docB3, along with an existence filter to state that the
850+
// total number of documents that match the query is 3.
851+
.watchSends({ affects: [query] }, docB1, docB2, docB3)
852+
.watchFilters([query], docB1.key, docB2.key, docB3.key)
853+
.watchSnapshots(1001)
854+
// The query listener is now inconsistent because it had thought that
855+
// the set of matching documents was docA1, docA2, and docA3 but the
856+
// server just told us that the set of matching documents is
857+
// completely different: docB1, docB2, and docB3. So the query
858+
// notifies the user that these documents were added, but still says
859+
// fromCache=true because we need to resolve docA1, docA2, and docA3.
860+
.expectEvents(query, {
861+
added: [docB1, docB2, docB3],
862+
fromCache: true
863+
})
864+
// After the existence filter mismatch the client re-listens without
865+
// a resume token.
866+
.expectActiveTargets({ query, resumeToken: '' })
867+
// When the existence filter mismatch was detected, we removed then
868+
// re-added the target; therefore, watch acknowledges the removal.
869+
.watchRemoves(query)
870+
// Watch has re-run the query and returns the same result set: docB1,
871+
// docB2, and docB3. This puts docA1, docA2, and docA3 into limbo,
872+
// which the client then issues queries to resolve. Since the maximum
873+
// number of concurrent limbo resolutions was set to 2, only the first
874+
// two limbo resolutions are started, with the 3rd being enqueued.
875+
.watchAcksFull(query, 1002, docB1, docB2, docB3)
876+
.expectLimboDocsEx({
877+
activeKeys: [docA1.key, docA2.key],
878+
inactiveKeys: [docA3.key]
879+
})
880+
.watchAcks(docA1Query)
881+
.watchAcks(docA2Query)
882+
.watchCurrents(docA1Query, 'resume-token-1003')
883+
.watchCurrents(docA2Query, 'resume-token-1003')
884+
.watchSnapshots(1003)
885+
// Watch has now confirmed that docA1 and docA2 have been deleted. So
886+
// the listener sends an event that the documents have
887+
// been removed; however, since docA3 is still enqueued for limbo
888+
// resolution the results are still from cache; however, now that
889+
// there are 0 limbo resolutions in progress, the limbo resolution for
890+
// docA3 is started.
891+
.expectEvents(query, { removed: [docA1, docA2], fromCache: true })
892+
.expectLimboDocs(docA3.key)
893+
.watchAcks(docA3Query)
894+
.watchCurrents(docA3Query, 'resume-token-1004')
895+
.watchSnapshots(1004)
896+
// Watch has now confirmed that docA3 has been deleted. So the
897+
// listener sends an event about this and now specifies
898+
// fromCache=false since we are in sync with the server and all docs
899+
// that were in limbo have been resolved.
900+
.expectEvents(query, { removed: [docA3] })
901+
.expectLimboDocs()
902+
);
903+
}
904+
);
642905
});

0 commit comments

Comments
 (0)