Skip to content

Commit 1f25d0d

Browse files
authored
Implement global resume token (#1052)
* Add a spec test that shows correct global resume token handling * Minimum implementation to handle global resume tokens * Remove unused QueryView.resumeToken * Avoid persisting the resume token unless required * Persist the resume token on unlisten
1 parent f14ebc2 commit 1f25d0d

File tree

5 files changed

+180
-20
lines changed

5 files changed

+180
-20
lines changed

packages/firestore/src/core/sync_engine.ts

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ import { Query } from './query';
4646
import { SnapshotVersion } from './snapshot_version';
4747
import { TargetIdGenerator } from './target_id_generator';
4848
import { Transaction } from './transaction';
49-
import { BatchId, OnlineState, ProtoByteString, TargetId } from './types';
49+
import { BatchId, OnlineState, TargetId } from './types';
5050
import {
5151
AddedLimboDocument,
5252
LimboDocumentChange,
@@ -77,12 +77,6 @@ class QueryView {
7777
* stream to identify this query.
7878
*/
7979
public targetId: TargetId,
80-
/**
81-
* An identifier from the datastore backend that indicates the last state
82-
* of the results that was received. This can be used to indicate where
83-
* to continue receiving new doc changes for the query.
84-
*/
85-
public resumeToken: ProtoByteString,
8680
/**
8781
* The view is responsible for computing the final merged truth of what
8882
* docs are in the query. It gets notified of local and remote changes,
@@ -195,12 +189,7 @@ export class SyncEngine implements RemoteSyncer {
195189
'applyChanges for new view should always return a snapshot'
196190
);
197191

198-
const data = new QueryView(
199-
query,
200-
queryData.targetId,
201-
queryData.resumeToken,
202-
view
203-
);
192+
const data = new QueryView(query, queryData.targetId, view);
204193
this.queryViewsByQuery.set(query, data);
205194
this.queryViewsByTarget[queryData.targetId] = data;
206195
this.viewHandler!([viewChange.snapshot!]);

packages/firestore/src/local/indexeddb_query_cache.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ export class IndexedDbQueryCache implements QueryCache {
4444
constructor(private serializer: LocalSerializer) {}
4545

4646
/**
47-
* The last received snapshot version. We store this seperately from the
47+
* The last received snapshot version. We store this separately from the
4848
* metadata to avoid the extra conversion to/from DbTimestamp.
4949
*/
5050
private lastRemoteSnapshotVersion = SnapshotVersion.MIN;
@@ -173,7 +173,7 @@ export class IndexedDbQueryCache implements QueryCache {
173173
): PersistencePromise<QueryData | null> {
174174
// Iterating by the canonicalId may yield more than one result because
175175
// canonicalId values are not required to be unique per target. This query
176-
// depends on the queryTargets index to be efficent.
176+
// depends on the queryTargets index to be efficient.
177177
const canonicalId = query.canonicalId();
178178
const range = IDBKeyRange.bound(
179179
[canonicalId, Number.NEGATIVE_INFINITY],
@@ -202,7 +202,7 @@ export class IndexedDbQueryCache implements QueryCache {
202202
targetId: TargetId
203203
): PersistencePromise<void> {
204204
// PORTING NOTE: The reverse index (documentsTargets) is maintained by
205-
// Indexeddb.
205+
// IndexedDb.
206206
const promises: Array<PersistencePromise<void>> = [];
207207
const store = documentTargetStore(txn);
208208
keys.forEach(key => {

packages/firestore/src/local/local_store.ts

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,15 @@ export interface LocalWriteResult {
111111
* unrecoverable error (should be caught / reported by the async_queue).
112112
*/
113113
export class LocalStore {
114+
/**
115+
* The maximum time to leave a resume token buffered without writing it out.
116+
* This value is arbitrary: it's long enough to avoid several writes
117+
* (possibly indefinitely if updates come more frequently than this) but
118+
* short enough that restarting after crashing will still have a pretty
119+
* recent resume token.
120+
*/
121+
private static readonly RESUME_TOKEN_MAX_AGE_MICROS = 5 * 60 * 1e6;
122+
114123
/**
115124
* The set of all mutations that have been sent but not yet been applied to
116125
* the backend.
@@ -469,12 +478,18 @@ export class LocalStore {
469478
// any preexisting value.
470479
const resumeToken = change.resumeToken;
471480
if (resumeToken.length > 0) {
481+
const oldQueryData = queryData;
472482
queryData = queryData.copy({
473483
resumeToken,
474484
snapshotVersion: remoteEvent.snapshotVersion
475485
});
476486
this.targetIds[targetId] = queryData;
477-
promises.push(this.queryCache.updateQueryData(txn, queryData));
487+
488+
if (
489+
LocalStore.shouldPersistQueryData(oldQueryData, queryData, change)
490+
) {
491+
promises.push(this.queryCache.updateQueryData(txn, queryData));
492+
}
478493
}
479494
}
480495
);
@@ -550,6 +565,50 @@ export class LocalStore {
550565
});
551566
}
552567

568+
/**
569+
* Returns true if the newQueryData should be persisted during an update of
570+
* an active target. QueryData should always be persisted when a target is
571+
* being released and should not call this function.
572+
*
573+
* While the target is active, QueryData updates can be omitted when nothing
574+
* about the target has changed except metadata like the resume token or
575+
* snapshot version. Occasionally it's worth the extra write to prevent these
576+
* values from getting too stale after a crash, but this doesn't have to be
577+
* too frequent.
578+
*/
579+
private static shouldPersistQueryData(
580+
oldQueryData: QueryData,
581+
newQueryData: QueryData,
582+
change: TargetChange
583+
): boolean {
584+
// Avoid clearing any existing value
585+
if (newQueryData.resumeToken.length === 0) return false;
586+
587+
// Any resume token is interesting if there isn't one already.
588+
if (oldQueryData.resumeToken.length === 0) return true;
589+
590+
// Don't allow resume token changes to be buffered indefinitely. This
591+
// allows us to be reasonably up-to-date after a crash and avoids needing
592+
// to loop over all active queries on shutdown. Especially in the browser
593+
// we may not get time to do anything interesting while the current tab is
594+
// closing.
595+
const timeDelta =
596+
newQueryData.snapshotVersion.toMicroseconds() -
597+
oldQueryData.snapshotVersion.toMicroseconds();
598+
if (timeDelta >= this.RESUME_TOKEN_MAX_AGE_MICROS) return true;
599+
600+
// Otherwise if the only thing that has changed about a target is its resume
601+
// token it's not worth persisting. Note that the RemoteStore keeps an
602+
// in-memory view of the currently active targets which includes the current
603+
// resume token, so stream failure or user changes will still use an
604+
// up-to-date resume token regardless of what we do here.
605+
const changes =
606+
change.addedDocuments.size +
607+
change.modifiedDocuments.size +
608+
change.removedDocuments.size;
609+
return changes > 0;
610+
}
611+
553612
/**
554613
* Notify local store of the changed views to locally pin documents.
555614
*/
@@ -638,10 +697,22 @@ export class LocalStore {
638697
queryData != null,
639698
'Tried to release nonexistent query: ' + query
640699
);
641-
this.localViewReferences.removeReferencesForId(queryData!.targetId);
642-
delete this.targetIds[queryData!.targetId];
700+
701+
const targetId = queryData!.targetId;
702+
const cachedQueryData = this.targetIds[targetId];
703+
704+
this.localViewReferences.removeReferencesForId(targetId);
705+
delete this.targetIds[targetId];
643706
if (this.garbageCollector.isEager) {
644707
return this.queryCache.removeQueryData(txn, queryData!);
708+
} else if (
709+
cachedQueryData.snapshotVersion > queryData!.snapshotVersion
710+
) {
711+
// If we've been avoiding persisting the resumeToken (see
712+
// shouldPersistQueryData for conditions and rationale) we need to
713+
// persist the token now because there will no longer be an
714+
// in-memory version to fall back on.
715+
return this.queryCache.updateQueryData(txn, cachedQueryData);
645716
} else {
646717
return PersistencePromise.resolve();
647718
}

packages/firestore/src/remote/watch_change.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ export class WatchChangeAggregator {
297297

298298
/** Processes and adds the WatchTargetChange to the current set of changes. */
299299
handleTargetChange(targetChange: WatchTargetChange): void {
300-
targetChange.targetIds.forEach(targetId => {
300+
this.forEachTarget(targetChange, targetId => {
301301
const targetState = this.ensureTargetState(targetId);
302302
switch (targetChange.state) {
303303
case WatchTargetChangeState.NoChange:
@@ -352,6 +352,22 @@ export class WatchChangeAggregator {
352352
});
353353
}
354354

355+
/**
356+
* Iterates over all targetIds that the watch change applies to: either the
357+
* targetIds explicitly listed in the change or the targetIds of all currently
358+
* active targets.
359+
*/
360+
forEachTarget(
361+
targetChange: WatchTargetChange,
362+
fn: (targetId: TargetId) => void
363+
): void {
364+
if (targetChange.targetIds.length > 0) {
365+
targetChange.targetIds.forEach(fn);
366+
} else {
367+
objUtils.forEachNumber(this.targetStates, fn);
368+
}
369+
}
370+
355371
/**
356372
* Handles existence filters and synthesizes deletes for filter mismatches.
357373
* Targets that are invalidated by filter mismatches are added to

packages/firestore/test/unit/specs/listen_spec.test.ts

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,4 +509,88 @@ describeSpec('Listens:', [], () => {
509509
.watchAcksFull(query, 3000)
510510
.expectEvents(query, {});
511511
});
512+
513+
specTest('Persists global resume tokens on unlisten', [], () => {
514+
const query = Query.atPath(path('collection'));
515+
const docA = doc('collection/a', 1000, { key: 'a' });
516+
517+
return (
518+
spec()
519+
.withGCEnabled(false)
520+
.userListens(query)
521+
.watchAcksFull(query, 1000, docA)
522+
.expectEvents(query, { added: [docA] })
523+
524+
// Some time later, watch sends an updated resume token and the user stops
525+
// listening.
526+
.watchSnapshots(2000, [], 'resume-token-2000')
527+
.userUnlistens(query)
528+
.watchRemoves(query)
529+
530+
.userListens(query, 'resume-token-2000')
531+
.expectEvents(query, { added: [docA], fromCache: true })
532+
.watchAcks(query)
533+
.watchCurrents(query, 'resume-token-3000')
534+
.watchSnapshots(3000)
535+
.expectEvents(query, { fromCache: false })
536+
);
537+
});
538+
539+
specTest('Omits global resume tokens for a short while', [], () => {
540+
const query = Query.atPath(path('collection'));
541+
const docA = doc('collection/a', 1000, { key: 'a' });
542+
543+
return (
544+
spec()
545+
.withGCEnabled(false)
546+
.userListens(query)
547+
.watchAcksFull(query, 1000, docA)
548+
.expectEvents(query, { added: [docA] })
549+
550+
// One millisecond later, watch sends an updated resume token but the
551+
// user doesn't manage to unlisten before restart.
552+
.watchSnapshots(2000, [], 'resume-token-2000')
553+
.restart()
554+
555+
.userListens(query, 'resume-token-1000')
556+
.expectEvents(query, { added: [docA], fromCache: true })
557+
.watchAcks(query)
558+
.watchCurrents(query, 'resume-token-3000')
559+
.watchSnapshots(3000)
560+
.expectEvents(query, { fromCache: false })
561+
);
562+
});
563+
564+
specTest(
565+
'Persists global resume tokens if the snapshot is old enough',
566+
[],
567+
() => {
568+
const initialVersion = 1000;
569+
const minutesLater = 5 * 60 * 1e6 + initialVersion;
570+
const evenLater = 1000 + minutesLater;
571+
572+
const query = Query.atPath(path('collection'));
573+
const docA = doc('collection/a', initialVersion, { key: 'a' });
574+
575+
return (
576+
spec()
577+
.withGCEnabled(false)
578+
.userListens(query)
579+
.watchAcksFull(query, initialVersion, docA)
580+
.expectEvents(query, { added: [docA] })
581+
582+
// 5 minutes later, watch sends an updated resume token but the user
583+
// doesn't manage to unlisten before restart.
584+
.watchSnapshots(minutesLater, [], 'resume-token-minutes-later')
585+
.restart()
586+
587+
.userListens(query, 'resume-token-minutes-later')
588+
.expectEvents(query, { added: [docA], fromCache: true })
589+
.watchAcks(query)
590+
.watchCurrents(query, 'resume-token-even-later')
591+
.watchSnapshots(evenLater)
592+
.expectEvents(query, { fromCache: false })
593+
);
594+
}
595+
);
512596
});

0 commit comments

Comments
 (0)