@@ -73,7 +73,6 @@ import { AsyncQueue } from '../util/async_queue';
73
73
import { TransactionRunner } from './transaction_runner' ;
74
74
75
75
const LOG_TAG = 'SyncEngine' ;
76
- const DEFAULT_MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100 ;
77
76
78
77
/**
79
78
* QueryView contains all of the data that SyncEngine needs to keep track of for
@@ -149,15 +148,16 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
149
148
q . canonicalId ( )
150
149
) ;
151
150
private queriesByTarget : { [ targetId : number ] : Query [ ] } = { } ;
151
+ /** The keys of documents that are in limbo for which we haven't yet started a limbo resolution query. */
152
+ private limboListenQueue : DocumentKey [ ] = [ ] ;
153
+ /** Keeps track of the target ID for each document that is in limbo with an active target. */
152
154
private limboTargetsByKey = new SortedMap < DocumentKey , TargetId > (
153
155
DocumentKey . comparator
154
156
) ;
157
+ /** Keeps track of the information about an active limbo resolution for each active target ID that was started for the purpose of limbo resolution. */
155
158
private limboResolutionsByTarget : {
156
159
[ targetId : number ] : LimboResolution ;
157
160
} = { } ;
158
- private readonly maxConcurrentLimboResolutions : number = DEFAULT_MAX_CONCURRENT_LIMBO_RESOLUTIONS ;
159
- /** The keys of documents whose limbo resolutions are enqueued. */
160
- private limboListenQueue : DocumentKey [ ] = [ ] ;
161
161
private limboDocumentRefs = new ReferenceSet ( ) ;
162
162
/** Stores user completion handlers, indexed by User and BatchId. */
163
163
private mutationUserCallbacks = { } as {
@@ -179,12 +179,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
179
179
// PORTING NOTE: Manages state synchronization in multi-tab environments.
180
180
private sharedClientState : SharedClientState ,
181
181
private currentUser : User ,
182
- maxConcurrentLimboResolutions ?: number
183
- ) {
184
- if ( maxConcurrentLimboResolutions ) {
185
- this . maxConcurrentLimboResolutions = maxConcurrentLimboResolutions ;
186
- }
187
- }
182
+ private maxConcurrentLimboResolutions : number = 100
183
+ ) { }
188
184
189
185
// Only used for testing.
190
186
get isPrimaryClient ( ) : boolean {
@@ -495,7 +491,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
495
491
// So go ahead and remove it from bookkeeping.
496
492
this . limboTargetsByKey = this . limboTargetsByKey . remove ( limboKey ) ;
497
493
delete this . limboResolutionsByTarget [ targetId ] ;
498
- this . startEnqueuedLimboResolutions ( ) ;
494
+ this . pumpLimboResolutionListenQueue ( ) ;
499
495
500
496
// TODO(klimt): We really only should do the following on permission
501
497
// denied errors, but we don't have the cause code here.
@@ -750,7 +746,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
750
746
this . remoteStore . unlisten ( limboTargetId ) ;
751
747
this . limboTargetsByKey = this . limboTargetsByKey . remove ( key ) ;
752
748
delete this . limboResolutionsByTarget [ limboTargetId ] ;
753
- this . startEnqueuedLimboResolutions ( ) ;
749
+ this . pumpLimboResolutionListenQueue ( ) ;
754
750
}
755
751
756
752
private updateTrackedLimbos (
@@ -782,11 +778,30 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
782
778
if ( ! this . limboTargetsByKey . get ( key ) ) {
783
779
log . debug ( LOG_TAG , 'New document in limbo: ' + key ) ;
784
780
this . limboListenQueue . push ( key ) ;
785
- this . startEnqueuedLimboResolutions ( ) ;
781
+ this . pumpLimboResolutionListenQueue ( ) ;
786
782
}
787
783
}
788
784
789
- private startEnqueuedLimboResolutions ( ) : void {
785
+ /**
786
+ * Starts listens for documents in limbo that are enqueued for resolution.
787
+ *
788
+ * When a document goes into limbo it is enqueued for resolution. This method
789
+ * repeatedly removes entries from the limbo resolution queue and starts a
790
+ * listen for them until either (1) the queue is empty, meaning that all
791
+ * documents that were in limbo either have active listens or have been
792
+ * resolved, or (2) the maximum number of concurrent limbo resolution listens
793
+ * has been reached.
794
+ *
795
+ * This method is invoked every time an entry is added to the limbo
796
+ * resolution queue and every time that a limbo resolution listen completes
797
+ * (either successfully or unsuccessfully). This ensures that all documents in
798
+ * limbo are eventually resolved.
799
+ *
800
+ * A maximum number of concurrent limbo resolution listens was implemented to
801
+ * prevent an unbounded number of active limbo resolution listens that can
802
+ * exhaust server resources and result in "resource exhausted" errors.
803
+ */
804
+ private pumpLimboResolutionListenQueue ( ) : void {
790
805
while (
791
806
this . limboListenQueue . length > 0 &&
792
807
this . limboTargetsByKey . size < this . maxConcurrentLimboResolutions
0 commit comments