Skip to content

Commit c822e78

Browse files
author
Brian Chen
authored
Retrying transactions with backoff (#2063)
1 parent 1674abf commit c822e78

File tree

8 files changed

+193
-80
lines changed

8 files changed

+193
-80
lines changed

packages/firestore/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
that were previously incorrectly disallowed are now allowed. For example,
1616
after reading a document that doesn't exist, you can now set it multiple
1717
times successfully in a transaction.
18+
- [changed] Transactions now perform exponential backoff before retrying.
19+
This means transactions on highly contended documents are more likely to
20+
succeed.
1821

1922
# 1.3.3
2023
- [changed] Firestore now recovers more quickly after network connectivity

packages/firestore/src/core/firestore_client.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -609,9 +609,11 @@ export class FirestoreClient {
609609
updateFunction: (transaction: Transaction) => Promise<T>
610610
): Promise<T> {
611611
this.verifyNotShutdown();
612-
// We have to wait for the async queue to be sure syncEngine is initialized.
613-
return this.asyncQueue
614-
.enqueue(async () => {})
615-
.then(() => this.syncEngine.runTransaction(updateFunction));
612+
const deferred = new Deferred<T>();
613+
this.asyncQueue.enqueueAndForget(() => {
614+
this.syncEngine.runTransaction(this.asyncQueue, updateFunction, deferred);
615+
return Promise.resolve();
616+
});
617+
return deferred.promise;
616618
}
617619
}

packages/firestore/src/core/sync_engine.ts

Lines changed: 17 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ import { primitiveComparator } from '../util/misc';
3939
import { ObjectMap } from '../util/obj_map';
4040
import { Deferred } from '../util/promise';
4141
import { SortedMap } from '../util/sorted_map';
42-
import { isNullOrUndefined } from '../util/types';
4342

4443
import { ignoreIfPrimaryLeaseLoss } from '../local/indexeddb_persistence';
4544
import { isDocumentChangeMissingError } from '../local/indexeddb_remote_document_cache';
@@ -71,7 +70,8 @@ import {
7170
ViewDocumentChanges
7271
} from './view';
7372
import { ViewSnapshot } from './view_snapshot';
74-
import { isPermanentError } from '../remote/rpc_error';
73+
import { AsyncQueue } from '../util/async_queue';
74+
import { TransactionRunner } from './transaction_runner';
7575

7676
const LOG_TAG = 'SyncEngine';
7777

@@ -352,45 +352,30 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
352352
* Takes an updateFunction in which a set of reads and writes can be performed
353353
* atomically. In the updateFunction, the client can read and write values
354354
* using the supplied transaction object. After the updateFunction, all
355-
* changes will be committed. If some other client has changed any of the data
356-
* referenced, then the updateFunction will be called again. If the
357-
* updateFunction still fails after the given number of retries, then the
358-
* transaction will be rejected.
355+
* changes will be committed. If a retryable error occurs (ex: some other
356+
* client has changed any of the data referenced), then the updateFunction
357+
* will be called again after a backoff. If the updateFunction still fails
358+
* after all retries, then the transaction will be rejected.
359359
*
360360
* The transaction object passed to the updateFunction contains methods for
361361
* accessing documents and collections. Unlike other datastore access, data
362362
* accessed with the transaction will not reflect local changes that have not
363363
* been committed. For this reason, it is required that all reads are
364364
* performed before any writes. Transactions must be performed while online.
365365
*
366-
* The promise returned is resolved when the transaction is fully committed.
366+
* The Deferred input is resolved when the transaction is fully committed.
367367
*/
368-
async runTransaction<T>(
368+
runTransaction<T>(
369+
asyncQueue: AsyncQueue,
369370
updateFunction: (transaction: Transaction) => Promise<T>,
370-
retries = 5
371-
): Promise<T> {
372-
assert(retries >= 0, 'Got negative number of retries for transaction.');
373-
const transaction = this.remoteStore.createTransaction();
374-
const userPromise = updateFunction(transaction);
375-
if (
376-
isNullOrUndefined(userPromise) ||
377-
!userPromise.catch ||
378-
!userPromise.then
379-
) {
380-
return Promise.reject<T>(
381-
Error('Transaction callback must return a Promise')
382-
);
383-
}
384-
try {
385-
const result = await userPromise;
386-
await transaction.commit();
387-
return result;
388-
} catch (error) {
389-
if (retries > 0 && this.isRetryableTransactionError(error)) {
390-
return this.runTransaction(updateFunction, retries - 1);
391-
}
392-
return Promise.reject<T>(error);
393-
}
371+
deferred: Deferred<T>
372+
): void {
373+
new TransactionRunner<T>(
374+
asyncQueue,
375+
this.remoteStore,
376+
updateFunction,
377+
deferred
378+
).run();
394379
}
395380

396381
async applyRemoteEvent(remoteEvent: RemoteEvent): Promise<void> {
@@ -916,20 +901,6 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
916901
return activeQueries;
917902
}
918903

919-
private isRetryableTransactionError(error: Error): boolean {
920-
if (error.name === 'FirebaseError') {
921-
// In transactions, the backend will fail outdated reads with FAILED_PRECONDITION and
922-
// non-matching document versions with ABORTED. These errors should be retried.
923-
const code = (error as FirestoreError).code;
924-
return (
925-
code === 'aborted' ||
926-
code === 'failed-precondition' ||
927-
!isPermanentError(code)
928-
);
929-
}
930-
return false;
931-
}
932-
933904
// PORTING NOTE: Multi-tab only
934905
getActiveClients(): Promise<ClientId[]> {
935906
return this.localStore.getActiveClients();
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/**
2+
* @license
3+
* Copyright 2019 Google Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
import { Deferred } from '../util/promise';
19+
import { TimerId, AsyncQueue } from '../util/async_queue';
20+
import { ExponentialBackoff } from '../remote/backoff';
21+
import { Transaction } from './transaction';
22+
import { RemoteStore } from '../remote/remote_store';
23+
import { isNullOrUndefined } from '../util/types';
24+
import { isPermanentError } from '../remote/rpc_error';
25+
import { FirestoreError } from '../util/error';
26+
27+
const RETRY_COUNT = 5;
28+
29+
/**
30+
* TransactionRunner encapsulates the logic needed to run and retry transactions
31+
* with backoff.
32+
*/
33+
export class TransactionRunner<T> {
34+
private retries = RETRY_COUNT;
35+
private backoff: ExponentialBackoff;
36+
37+
constructor(
38+
private readonly asyncQueue: AsyncQueue,
39+
private readonly remoteStore: RemoteStore,
40+
private readonly updateFunction: (transaction: Transaction) => Promise<T>,
41+
private readonly deferred: Deferred<T>
42+
) {
43+
this.backoff = new ExponentialBackoff(
44+
this.asyncQueue,
45+
TimerId.RetryTransaction
46+
);
47+
}
48+
49+
/** Runs the transaction and sets the result on deferred. */
50+
run(): void {
51+
this.runWithBackOff();
52+
}
53+
54+
private runWithBackOff(): void {
55+
this.backoff.backoffAndRun(async () => {
56+
const transaction = this.remoteStore.createTransaction();
57+
const userPromise = this.tryRunUpdateFunction(transaction);
58+
if (userPromise) {
59+
userPromise
60+
.then(result => {
61+
this.asyncQueue.enqueueAndForget(() => {
62+
return transaction
63+
.commit()
64+
.then(() => {
65+
this.deferred.resolve(result);
66+
})
67+
.catch(commitError => {
68+
this.handleTransactionError(commitError);
69+
});
70+
});
71+
})
72+
.catch(userPromiseError => {
73+
this.handleTransactionError(userPromiseError);
74+
});
75+
}
76+
});
77+
}
78+
79+
private tryRunUpdateFunction(transaction: Transaction): Promise<T> | null {
80+
try {
81+
const userPromise = this.updateFunction(transaction);
82+
if (
83+
isNullOrUndefined(userPromise) ||
84+
!userPromise.catch ||
85+
!userPromise.then
86+
) {
87+
this.deferred.reject(
88+
Error('Transaction callback must return a Promise')
89+
);
90+
return null;
91+
}
92+
return userPromise;
93+
} catch (error) {
94+
// Do not retry errors thrown by user provided updateFunction.
95+
this.deferred.reject(error);
96+
return null;
97+
}
98+
}
99+
100+
private handleTransactionError(error: Error): void {
101+
if (this.retries > 0 && this.isRetryableTransactionError(error)) {
102+
this.retries -= 1;
103+
this.asyncQueue.enqueueAndForget(() => {
104+
this.runWithBackOff();
105+
return Promise.resolve();
106+
});
107+
} else {
108+
this.deferred.reject(error);
109+
}
110+
}
111+
112+
private isRetryableTransactionError(error: Error): boolean {
113+
if (error.name === 'FirebaseError') {
114+
// In transactions, the backend will fail outdated reads with FAILED_PRECONDITION and
115+
// non-matching document versions with ABORTED. These errors should be retried.
116+
const code = (error as FirestoreError).code;
117+
return (
118+
code === 'aborted' ||
119+
code === 'failed-precondition' ||
120+
!isPermanentError(code)
121+
);
122+
}
123+
return false;
124+
}
125+
}

packages/firestore/src/remote/backoff.ts

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,17 @@ import * as log from '../util/log';
2020
import { CancelablePromise } from '../util/promise';
2121
const LOG_TAG = 'ExponentialBackoff';
2222

23+
/**
24+
* Initial backoff time in milliseconds after an error.
25+
* Set to 1s according to https://cloud.google.com/apis/design/errors.
26+
*/
27+
const DEFAULT_BACKOFF_INITIAL_DELAY_MS = 1000;
28+
29+
const DEFAULT_BACKOFF_FACTOR = 1.5;
30+
31+
/** Maximum backoff time in milliseconds */
32+
const DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000;
33+
2334
/**
2435
* A helper for running delayed tasks following an exponential backoff curve
2536
* between attempts.
@@ -49,18 +60,18 @@ export class ExponentialBackoff {
4960
* Note that jitter will still be applied, so the actual delay could be as
5061
* little as 0.5*initialDelayMs.
5162
*/
52-
private readonly initialDelayMs: number,
63+
private readonly initialDelayMs: number = DEFAULT_BACKOFF_INITIAL_DELAY_MS,
5364
/**
5465
* The multiplier to use to determine the extended base delay after each
5566
* attempt.
5667
*/
57-
private readonly backoffFactor: number,
68+
private readonly backoffFactor: number = DEFAULT_BACKOFF_FACTOR,
5869
/**
5970
* The maximum base delay after which no further backoff is performed.
6071
* Note that jitter will still be applied, so the actual delay could be as
6172
* much as 1.5*maxDelayMs.
6273
*/
63-
private readonly maxDelayMs: number
74+
private readonly maxDelayMs: number = DEFAULT_BACKOFF_MAX_DELAY_MS
6475
) {
6576
this.reset();
6677
}

packages/firestore/src/remote/persistent_stream.ts

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -116,17 +116,6 @@ export interface PersistentStreamListener {
116116
onClose: (err?: FirestoreError) => Promise<void>;
117117
}
118118

119-
/**
120-
* Initial backoff time in milliseconds after an error.
121-
* Set to 1s according to https://cloud.google.com/apis/design/errors.
122-
*/
123-
const BACKOFF_INITIAL_DELAY_MS = 1000;
124-
125-
/** Maximum backoff time in milliseconds */
126-
const BACKOFF_MAX_DELAY_MS = 60 * 1000;
127-
128-
const BACKOFF_FACTOR = 1.5;
129-
130119
/** The time a stream stays open after it is marked idle. */
131120
const IDLE_TIMEOUT_MS = 60 * 1000;
132121

@@ -188,13 +177,7 @@ export abstract class PersistentStream<
188177
private credentialsProvider: CredentialsProvider,
189178
protected listener: ListenerType
190179
) {
191-
this.backoff = new ExponentialBackoff(
192-
queue,
193-
connectionTimerId,
194-
BACKOFF_INITIAL_DELAY_MS,
195-
BACKOFF_FACTOR,
196-
BACKOFF_MAX_DELAY_MS
197-
);
180+
this.backoff = new ExponentialBackoff(queue, connectionTimerId);
198181
}
199182

200183
/**

packages/firestore/src/util/async_queue.ts

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,13 @@ export enum TimerId {
5959
ClientMetadataRefresh = 'client_metadata_refresh',
6060

6161
/** A timer used to periodically attempt LRU Garbage collection */
62-
LruGarbageCollection = 'lru_garbage_collection'
62+
LruGarbageCollection = 'lru_garbage_collection',
63+
64+
/**
65+
* A timer used to retry transactions. Since there can be multiple concurrent
66+
* transactions, multiple of these may be in the queue at a given time.
67+
*/
68+
RetryTransaction = 'retry_transaction'
6369
}
6470

6571
/**
@@ -203,6 +209,9 @@ export class AsyncQueue {
203209
// assertion sanity-checks.
204210
private operationInProgress = false;
205211

212+
// List of TimerIds to fast-forward delays for.
213+
private timerIdsToSkip: TimerId[] = [];
214+
206215
// Is this AsyncQueue being shut down? If true, this instance will not enqueue
207216
// any new operations, Promises from enqueue requests will not resolve.
208217
get isShuttingDown(): boolean {
@@ -319,13 +328,6 @@ export class AsyncQueue {
319328
`Attempted to schedule an operation with a negative delay of ${delayMs}`
320329
);
321330

322-
// While not necessarily harmful, we currently don't expect to have multiple
323-
// ops with the same timer id in the queue, so defensively reject them.
324-
assert(
325-
!this.containsDelayedOperation(timerId),
326-
`Attempted to schedule multiple operations with timer id ${timerId}.`
327-
);
328-
329331
const delayedOp = DelayedOperation.createAndSchedule<T>(
330332
this,
331333
timerId,
@@ -336,6 +338,11 @@ export class AsyncQueue {
336338
);
337339
this.delayedOperations.push(delayedOp as DelayedOperation<unknown>);
338340

341+
// Fast-forward delays for timerIds that have been overriden.
342+
if (this.timerIdsToSkip.indexOf(delayedOp.timerId) > -1) {
343+
delayedOp.skipDelay();
344+
}
345+
339346
return delayedOp;
340347
}
341348

@@ -414,6 +421,13 @@ export class AsyncQueue {
414421
});
415422
}
416423

424+
/**
425+
* For Tests: Skip all subsequent delays for a timer id.
426+
*/
427+
skipDelaysForTimerId(timerId: TimerId): void {
428+
this.timerIdsToSkip.push(timerId);
429+
}
430+
417431
/** Called once a DelayedOperation is run or canceled. */
418432
private removeDelayedOperation(op: DelayedOperation<unknown>): void {
419433
// NOTE: indexOf / slice are O(n), but delayedOperations is expected to be small.

0 commit comments

Comments
 (0)