Skip to content

Commit 8e2fd91

Browse files
IndexedDB Recovery for Limbo documents (#3039)
1 parent 54c7872 commit 8e2fd91

File tree

4 files changed

+132
-22
lines changed

4 files changed

+132
-22
lines changed

packages/firestore/src/core/sync_engine.ts

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -483,14 +483,6 @@ export class SyncEngine implements RemoteSyncer {
483483
const limboResolution = this.activeLimboResolutionsByTarget.get(targetId);
484484
const limboKey = limboResolution && limboResolution.key;
485485
if (limboKey) {
486-
// Since this query failed, we won't want to manually unlisten to it.
487-
// So go ahead and remove it from bookkeeping.
488-
this.activeLimboTargetsByKey = this.activeLimboTargetsByKey.remove(
489-
limboKey
490-
);
491-
this.activeLimboResolutionsByTarget.delete(targetId);
492-
this.pumpEnqueuedLimboResolutions();
493-
494486
// TODO(klimt): We really only should do the following on permission
495487
// denied errors, but we don't have the cause code here.
496488

@@ -513,7 +505,19 @@ export class SyncEngine implements RemoteSyncer {
513505
documentUpdates,
514506
resolvedLimboDocuments
515507
);
516-
return this.applyRemoteEvent(event);
508+
509+
await this.applyRemoteEvent(event);
510+
511+
// Since this query failed, we won't want to manually unlisten to it.
512+
// We only remove it from bookkeeping after we successfully applied the
513+
// RemoteEvent. If `applyRemoteEvent()` throws, we want to re-listen to
514+
// this query when the RemoteStore restarts the Watch stream, which should
515+
// re-trigger the target failure.
516+
this.activeLimboTargetsByKey = this.activeLimboTargetsByKey.remove(
517+
limboKey
518+
);
519+
this.activeLimboResolutionsByTarget.delete(targetId);
520+
this.pumpEnqueuedLimboResolutions();
517521
} else {
518522
await this.localStore
519523
.releaseTarget(targetId, /* keepPersistedTargetData */ false)

packages/firestore/test/unit/specs/recovery_spec.test.ts

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import { client, spec } from './spec_builder';
2020
import { TimerId } from '../../../src/util/async_queue';
2121
import { Query } from '../../../src/core/query';
2222
import { Code } from '../../../src/util/error';
23-
import { doc, path } from '../../util/helpers';
23+
import { doc, filter, path } from '../../util/helpers';
2424
import { RpcError } from './spec_rpc_error';
2525

2626
describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
@@ -309,4 +309,101 @@ describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
309309
);
310310
}
311311
);
312+
313+
specTest(
314+
'Recovers when Limbo acknowledgement cannot be persisted',
315+
['durable-persistence'],
316+
() => {
317+
const fullQuery = Query.atPath(path('collection'));
318+
const filteredQuery = Query.atPath(path('collection')).addFilter(
319+
filter('included', '==', true)
320+
);
321+
const doc1a = doc('collection/key1', 1, { included: true });
322+
const doc1b = doc('collection/key1', 1500, { included: false });
323+
const limboQuery = Query.atPath(doc1a.key.path);
324+
return spec()
325+
.userListens(fullQuery)
326+
.watchAcksFull(fullQuery, 1000, doc1a)
327+
.expectEvents(fullQuery, {
328+
added: [doc1a]
329+
})
330+
.userUnlistens(fullQuery)
331+
.userListens(filteredQuery)
332+
.expectEvents(filteredQuery, {
333+
added: [doc1a],
334+
fromCache: true
335+
})
336+
.watchAcksFull(filteredQuery, 2000)
337+
.expectLimboDocs(doc1a.key)
338+
.failDatabaseTransactions({ 'Get last remote snapshot version': true })
339+
.watchAcksFull(limboQuery, 3000, doc1b)
340+
.expectActiveTargets()
341+
.recoverDatabase()
342+
.runTimer(TimerId.AsyncQueueRetry)
343+
.expectActiveTargets(
344+
{
345+
query: filteredQuery,
346+
resumeToken: 'resume-token-2000'
347+
},
348+
{ query: limboQuery }
349+
)
350+
.watchAcksFull(filteredQuery, 4000)
351+
.watchAcksFull(limboQuery, 4000, doc1b)
352+
.expectLimboDocs()
353+
.expectEvents(filteredQuery, {
354+
removed: [doc1a]
355+
});
356+
}
357+
);
358+
359+
specTest(
360+
'Recovers when Limbo rejection cannot be persisted',
361+
['durable-persistence'],
362+
() => {
363+
const fullQuery = Query.atPath(path('collection'));
364+
const filteredQuery = Query.atPath(path('collection')).addFilter(
365+
filter('included', '==', true)
366+
);
367+
const doc1 = doc('collection/key1', 1, { included: true });
368+
const limboQuery = Query.atPath(doc1.key.path);
369+
return spec()
370+
.userListens(fullQuery)
371+
.watchAcksFull(fullQuery, 1000, doc1)
372+
.expectEvents(fullQuery, {
373+
added: [doc1]
374+
})
375+
.userUnlistens(fullQuery)
376+
.userListens(filteredQuery)
377+
.expectEvents(filteredQuery, {
378+
added: [doc1],
379+
fromCache: true
380+
})
381+
.watchAcksFull(filteredQuery, 2000)
382+
.expectLimboDocs(doc1.key)
383+
.failDatabaseTransactions({
384+
'Apply remote event': true,
385+
'Get last remote snapshot version': true
386+
})
387+
.watchRemoves(
388+
limboQuery,
389+
new RpcError(Code.PERMISSION_DENIED, 'Test error')
390+
)
391+
.expectActiveTargets()
392+
.recoverDatabase()
393+
.runTimer(TimerId.AsyncQueueRetry)
394+
.expectActiveTargets(
395+
{ query: filteredQuery, resumeToken: 'resume-token-2000' },
396+
{ query: limboQuery }
397+
)
398+
.watchAcksFull(filteredQuery, 3000)
399+
.watchRemoves(
400+
limboQuery,
401+
new RpcError(Code.PERMISSION_DENIED, 'Test error')
402+
)
403+
.expectLimboDocs()
404+
.expectEvents(filteredQuery, {
405+
removed: [doc1]
406+
});
407+
}
408+
);
312409
});

packages/firestore/test/unit/specs/spec_test_components.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,9 @@ export class MockConnection implements Connection {
206206
/** A Deferred that is resolved once watch opens. */
207207
watchOpen = new Deferred<void>();
208208

209+
/** Whether the Watch stream is open. */
210+
isWatchOpen = false;
211+
209212
invokeRPC<Req>(rpcName: string, request: Req): never {
210213
throw new Error('Not implemented!');
211214
}
@@ -263,6 +266,7 @@ export class MockConnection implements Connection {
263266
this.watchOpen = new Deferred<void>();
264267
this.watchStream!.callOnClose(err);
265268
this.watchStream = null;
269+
this.isWatchOpen = false;
266270
}
267271

268272
openStream<Req, Resp>(
@@ -351,6 +355,7 @@ export class MockConnection implements Connection {
351355
this.queue.enqueueAndForget(async () => {
352356
if (this.watchStream === watchStream) {
353357
watchStream.callOnOpen();
358+
this.isWatchOpen = true;
354359
this.watchOpen.resolve();
355360
}
356361
});

packages/firestore/test/unit/specs/spec_test_runner.ts

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -847,18 +847,22 @@ abstract class TestRunner {
847847

848848
private validateActiveLimboDocs(): void {
849849
let actualLimboDocs = this.syncEngine.activeLimboDocumentResolutions();
850-
// Validate that each active limbo doc has an expected active target
851-
actualLimboDocs.forEach((key, targetId) => {
852-
const targetIds = new Array(this.expectedActiveTargets.keys()).map(
853-
n => '' + n
854-
);
855-
expect(this.expectedActiveTargets.has(targetId)).to.equal(
856-
true,
857-
`Found limbo doc ${key.toString()}, but its target ID ${targetId} ` +
858-
`was not in the set of expected active target IDs ` +
859-
`(${targetIds.join(', ')})`
860-
);
861-
});
850+
851+
if (this.connection.isWatchOpen) {
852+
// Validate that each active limbo doc has an expected active target
853+
actualLimboDocs.forEach((key, targetId) => {
854+
const targetIds = new Array(this.expectedActiveTargets.keys()).map(
855+
n => '' + n
856+
);
857+
expect(this.expectedActiveTargets.has(targetId)).to.equal(
858+
true,
859+
`Found limbo doc ${key.toString()}, but its target ID ${targetId} ` +
860+
`was not in the set of expected active target IDs ` +
861+
`(${targetIds.join(', ')})`
862+
);
863+
});
864+
}
865+
862866
for (const expectedLimboDoc of this.expectedActiveLimboDocs) {
863867
expect(actualLimboDocs.get(expectedLimboDoc)).to.not.equal(
864868
null,

0 commit comments

Comments
 (0)