Skip to content

Commit 0dc9426

Browse files
author
Brian Chen
committed
retrying transactions with backoff, override asyncQueue
1 parent 7a2403a commit 0dc9426

File tree

6 files changed

+108
-37
lines changed

6 files changed

+108
-37
lines changed

packages/firestore/src/api/database.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,9 +307,8 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService {
307307
// are already set to synchronize on the async queue.
308308
private _firestoreClient: FirestoreClient | undefined;
309309

310-
// Public for use in tests.
311310
// TODO(mikelehen): Use modularized initialization instead.
312-
readonly _queue = new AsyncQueue();
311+
private _queue = new AsyncQueue();
313312

314313
_dataConverter: UserDataConverter;
315314

@@ -456,6 +455,16 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService {
456455
return this._firestoreClient as FirestoreClient;
457456
}
458457

458+
// Public for use in tests.
459+
getQueue(): AsyncQueue {
460+
return this._queue;
461+
}
462+
463+
// Public for use in tests.
464+
setQueue(newQueue: AsyncQueue): void {
465+
this._queue = newQueue;
466+
}
467+
459468
private makeDatabaseInfo(): DatabaseInfo {
460469
return new DatabaseInfo(
461470
this._config.databaseId,

packages/firestore/src/core/firestore_client.ts

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import { Platform } from '../platform/platform';
3333
import { Datastore } from '../remote/datastore';
3434
import { RemoteStore } from '../remote/remote_store';
3535
import { JsonProtoSerializer } from '../remote/serializer';
36-
import { AsyncQueue } from '../util/async_queue';
36+
import { AsyncQueue, TimerId } from '../util/async_queue';
3737
import { Code, FirestoreError } from '../util/error';
3838
import { debug } from '../util/log';
3939
import { Deferred } from '../util/promise';
@@ -62,6 +62,7 @@ import { Query } from './query';
6262
import { Transaction } from './transaction';
6363
import { OnlineState, OnlineStateSource } from './types';
6464
import { ViewSnapshot } from './view_snapshot';
65+
import { ExponentialBackoff } from '../remote/backoff';
6566

6667
const LOG_TAG = 'FirestoreClient';
6768

@@ -70,6 +71,17 @@ const DOM_EXCEPTION_INVALID_STATE = 11;
7071
const DOM_EXCEPTION_ABORTED = 20;
7172
const DOM_EXCEPTION_QUOTA_EXCEEDED = 22;
7273

74+
/**
75+
* Initial backoff time in milliseconds after an error.
76+
* Set to 1s according to https://cloud.google.com/apis/design/errors.
77+
*/
78+
const BACKOFF_INITIAL_DELAY_MS = 1000;
79+
80+
/** Maximum backoff time in milliseconds */
81+
const BACKOFF_MAX_DELAY_MS = 60 * 1000;
82+
83+
const BACKOFF_FACTOR = 1.5;
84+
7385
export class IndexedDbPersistenceSettings {
7486
constructor(
7587
readonly cacheSizeBytes: number,
@@ -613,6 +625,15 @@ export class FirestoreClient {
613625
// We have to wait for the async queue to be sure syncEngine is initialized.
614626
return this.asyncQueue
615627
.enqueue(async () => {})
616-
.then(() => this.syncEngine.runTransaction(updateFunction));
628+
.then(() => {
629+
const backoff = new ExponentialBackoff(
630+
this.asyncQueue,
631+
TimerId.RetryTransaction,
632+
BACKOFF_INITIAL_DELAY_MS,
633+
BACKOFF_FACTOR,
634+
BACKOFF_MAX_DELAY_MS
635+
);
636+
return this.syncEngine.runTransaction(updateFunction, backoff);
637+
});
617638
}
618639
}

packages/firestore/src/core/sync_engine.ts

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ import {
7272
} from './view';
7373
import { ViewSnapshot } from './view_snapshot';
7474
import { isPermanentError } from '../remote/rpc_error';
75+
import { ExponentialBackoff } from '../remote/backoff';
7576

7677
const LOG_TAG = 'SyncEngine';
7778

@@ -352,10 +353,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
352353
* Takes an updateFunction in which a set of reads and writes can be performed
353354
* atomically. In the updateFunction, the client can read and write values
354355
* using the supplied transaction object. After the updateFunction, all
355-
* changes will be committed. If some other client has changed any of the data
356-
* referenced, then the updateFunction will be called again. If the
357-
* updateFunction still fails after the given number of retries, then the
358-
* transaction will be rejected.
356+
* changes will be committed. If a retryable error occurs (ex: some other
357+
* client has changed any of the data referenced), then the updateFunction
358+
* will be called again after a backoff. If the updateFunction still fails
359+
* after the given number of retries, then the transaction will be rejected.
359360
*
360361
* The transaction object passed to the updateFunction contains methods for
361362
* accessing documents and collections. Unlike other datastore access, data
@@ -367,30 +368,41 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
367368
*/
368369
async runTransaction<T>(
369370
updateFunction: (transaction: Transaction) => Promise<T>,
371+
backoff: ExponentialBackoff,
370372
retries = 5
371373
): Promise<T> {
372374
assert(retries >= 0, 'Got negative number of retries for transaction.');
373-
const transaction = this.remoteStore.createTransaction();
374-
const userPromise = updateFunction(transaction);
375-
if (
376-
isNullOrUndefined(userPromise) ||
377-
!userPromise.catch ||
378-
!userPromise.then
379-
) {
380-
return Promise.reject<T>(
381-
Error('Transaction callback must return a Promise')
382-
);
383-
}
384-
try {
385-
const result = await userPromise;
386-
await transaction.commit();
387-
return result;
388-
} catch (error) {
389-
if (retries > 0 && this.isRetryableTransactionError(error)) {
390-
return this.runTransaction(updateFunction, retries - 1);
375+
let transactionError;
376+
for (let i = 0; i < retries; i++) {
377+
const backoffDeferred = new Deferred<void>();
378+
backoff.backoffAndRun(async () => {
379+
backoffDeferred.resolve();
380+
});
381+
382+
try {
383+
await backoffDeferred.promise;
384+
const transaction = this.remoteStore.createTransaction();
385+
const userPromise = updateFunction(transaction);
386+
if (
387+
isNullOrUndefined(userPromise) ||
388+
!userPromise.catch ||
389+
!userPromise.then
390+
) {
391+
return Promise.reject<T>(
392+
Error('Transaction callback must return a Promise')
393+
);
394+
}
395+
const result = await userPromise;
396+
await transaction.commit();
397+
return result;
398+
} catch (error) {
399+
transactionError = error;
400+
if (!this.isRetryableTransactionError(error)) {
401+
break;
402+
}
391403
}
392-
return Promise.reject<T>(error);
393404
}
405+
return Promise.reject<T>(transactionError);
394406
}
395407

396408
async applyRemoteEvent(remoteEvent: RemoteEvent): Promise<void> {

packages/firestore/src/util/async_queue.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,10 @@ export enum TimerId {
5959
ClientMetadataRefresh = 'client_metadata_refresh',
6060

6161
/** A timer used to periodically attempt LRU Garbage collection */
62-
LruGarbageCollection = 'lru_garbage_collection'
62+
LruGarbageCollection = 'lru_garbage_collection',
63+
64+
/** A timer used to retry transactions. */
65+
RetryTransaction = 'retry_transaction'
6366
}
6467

6568
/**
@@ -263,13 +266,6 @@ export class AsyncQueue {
263266
`Attempted to schedule an operation with a negative delay of ${delayMs}`
264267
);
265268

266-
// While not necessarily harmful, we currently don't expect to have multiple
267-
// ops with the same timer id in the queue, so defensively reject them.
268-
assert(
269-
!this.containsDelayedOperation(timerId),
270-
`Attempted to schedule multiple operations with timer id ${timerId}.`
271-
);
272-
273269
const delayedOp = DelayedOperation.createAndSchedule<unknown>(
274270
this,
275271
timerId,

packages/firestore/test/integration/api/transactions.test.ts

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,31 @@ import { expect } from 'chai';
2020
import { Deferred } from '../../util/promise';
2121
import firebase from '../util/firebase_export';
2222
import * as integrationHelpers from '../util/helpers';
23+
import { setAsyncQueue } from '../util/internal_helpers';
24+
import { AsyncQueue, TimerId } from '../../../src/util/async_queue';
25+
import { CancelablePromise } from '../../../src/util/promise';
2326

24-
const apiDescribe = integrationHelpers.apiDescribe;
27+
/**
28+
* Version of the AsyncQueue that overrides `enqueueAfterDelay` to fast-forward
29+
* the backoffs used in retrying transactions.
30+
*/
31+
class AsyncQueueWithoutTransactionBackoff extends AsyncQueue {
32+
enqueueAfterDelay<T extends unknown>(
33+
timerId: TimerId,
34+
delayMs: number,
35+
op: () => Promise<T>
36+
): CancelablePromise<T> {
37+
const result = super.enqueueAfterDelay(timerId, delayMs, op);
38+
if (this.containsDelayedOperation(TimerId.RetryTransaction)) {
39+
this.runDelayedOperationsEarly(TimerId.RetryTransaction)
40+
.then(() => {})
41+
.catch(() => 'obligatory catch');
42+
}
43+
return result;
44+
}
45+
}
2546

47+
const apiDescribe = integrationHelpers.apiDescribe;
2648
apiDescribe('Database transactions', (persistence: boolean) => {
2749
type TransactionStage = (
2850
transaction: firestore.Transaction,
@@ -460,6 +482,7 @@ apiDescribe('Database transactions', (persistence: boolean) => {
460482
let started = 0;
461483

462484
return integrationHelpers.withTestDb(persistence, db => {
485+
setAsyncQueue(db, new AsyncQueueWithoutTransactionBackoff());
463486
const doc = db.collection('counters').doc();
464487
return doc
465488
.set({
@@ -516,6 +539,7 @@ apiDescribe('Database transactions', (persistence: boolean) => {
516539
let counter = 0;
517540

518541
return integrationHelpers.withTestDb(persistence, db => {
542+
setAsyncQueue(db, new AsyncQueueWithoutTransactionBackoff());
519543
const doc = db.collection('counters').doc();
520544
return doc
521545
.set({
@@ -654,6 +678,7 @@ apiDescribe('Database transactions', (persistence: boolean) => {
654678

655679
it('handle reading a doc twice with different versions', () => {
656680
return integrationHelpers.withTestDb(persistence, db => {
681+
setAsyncQueue(db, new AsyncQueueWithoutTransactionBackoff());
657682
const doc = db.collection('counters').doc();
658683
let counter = 0;
659684
return doc

packages/firestore/test/integration/util/internal_helpers.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,15 @@ import { DEFAULT_PROJECT_ID, DEFAULT_SETTINGS } from './helpers';
3131

3232
/** Helper to retrieve the AsyncQueue for a give FirebaseFirestore instance. */
3333
export function asyncQueue(db: firestore.FirebaseFirestore): AsyncQueue {
34-
return (db as Firestore)._queue;
34+
return (db as Firestore).getQueue();
35+
}
36+
37+
/** Helper to override the AsyncQueue for a give FirebaseFirestore instance. */
38+
export function setAsyncQueue(
39+
db: firestore.FirebaseFirestore,
40+
newQueue: AsyncQueue
41+
): void {
42+
(db as Firestore).setQueue(newQueue);
3543
}
3644

3745
export function getDefaultDatabaseInfo(): DatabaseInfo {

0 commit comments

Comments
 (0)