Skip to content

Retrying transactions with backoff #2063

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Aug 13, 2019
3 changes: 3 additions & 0 deletions packages/firestore/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
that were previously incorrectly disallowed are now allowed. For example,
after reading a document that doesn't exist, you can now set it multiple
times successfully in a transaction.
- [changed] Transactions now perform exponential backoff before retrying.
This means transactions on highly contended documents are more likely to
succeed.

# 1.3.3
- [changed] Firestore now recovers more quickly after network connectivity
Expand Down
10 changes: 6 additions & 4 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -609,9 +609,11 @@ export class FirestoreClient {
updateFunction: (transaction: Transaction) => Promise<T>
): Promise<T> {
this.verifyNotShutdown();
// We have to wait for the async queue to be sure syncEngine is initialized.
return this.asyncQueue
.enqueue(async () => {})
.then(() => this.syncEngine.runTransaction(updateFunction));
const deferred = new Deferred<T>();
this.asyncQueue.enqueueAndForget(() => {
this.syncEngine.runTransaction(this.asyncQueue, updateFunction, deferred);
return Promise.resolve();
});
return deferred.promise;
}
}
63 changes: 17 additions & 46 deletions packages/firestore/src/core/sync_engine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import { primitiveComparator } from '../util/misc';
import { ObjectMap } from '../util/obj_map';
import { Deferred } from '../util/promise';
import { SortedMap } from '../util/sorted_map';
import { isNullOrUndefined } from '../util/types';

import { ignoreIfPrimaryLeaseLoss } from '../local/indexeddb_persistence';
import { isDocumentChangeMissingError } from '../local/indexeddb_remote_document_cache';
Expand Down Expand Up @@ -71,7 +70,8 @@ import {
ViewDocumentChanges
} from './view';
import { ViewSnapshot } from './view_snapshot';
import { isPermanentError } from '../remote/rpc_error';
import { AsyncQueue } from '../util/async_queue';
import { TransactionRunner } from './transaction_runner';

const LOG_TAG = 'SyncEngine';

Expand Down Expand Up @@ -352,45 +352,30 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
* Takes an updateFunction in which a set of reads and writes can be performed
* atomically. In the updateFunction, the client can read and write values
* using the supplied transaction object. After the updateFunction, all
* changes will be committed. If some other client has changed any of the data
* referenced, then the updateFunction will be called again. If the
* updateFunction still fails after the given number of retries, then the
* transaction will be rejected.
* changes will be committed. If a retryable error occurs (ex: some other
* client has changed any of the data referenced), then the updateFunction
* will be called again after a backoff. If the updateFunction still fails
* after all retries, then the transaction will be rejected.
*
* The transaction object passed to the updateFunction contains methods for
* accessing documents and collections. Unlike other datastore access, data
* accessed with the transaction will not reflect local changes that have not
* been committed. For this reason, it is required that all reads are
* performed before any writes. Transactions must be performed while online.
*
* The promise returned is resolved when the transaction is fully committed.
* The Deferred input is resolved when the transaction is fully committed.
*/
async runTransaction<T>(
runTransaction<T>(
asyncQueue: AsyncQueue,
updateFunction: (transaction: Transaction) => Promise<T>,
retries = 5
): Promise<T> {
assert(retries >= 0, 'Got negative number of retries for transaction.');
const transaction = this.remoteStore.createTransaction();
const userPromise = updateFunction(transaction);
if (
isNullOrUndefined(userPromise) ||
!userPromise.catch ||
!userPromise.then
) {
return Promise.reject<T>(
Error('Transaction callback must return a Promise')
);
}
try {
const result = await userPromise;
await transaction.commit();
return result;
} catch (error) {
if (retries > 0 && this.isRetryableTransactionError(error)) {
return this.runTransaction(updateFunction, retries - 1);
}
return Promise.reject<T>(error);
}
deferred: Deferred<T>
): void {
new TransactionRunner<T>(
asyncQueue,
this.remoteStore,
updateFunction,
deferred
).run();
}

async applyRemoteEvent(remoteEvent: RemoteEvent): Promise<void> {
Expand Down Expand Up @@ -916,20 +901,6 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
return activeQueries;
}

private isRetryableTransactionError(error: Error): boolean {
if (error.name === 'FirebaseError') {
// In transactions, the backend will fail outdated reads with FAILED_PRECONDITION and
// non-matching document versions with ABORTED. These errors should be retried.
const code = (error as FirestoreError).code;
return (
code === 'aborted' ||
code === 'failed-precondition' ||
!isPermanentError(code)
);
}
return false;
}

// PORTING NOTE: Multi-tab only
getActiveClients(): Promise<ClientId[]> {
return this.localStore.getActiveClients();
Expand Down
125 changes: 125 additions & 0 deletions packages/firestore/src/core/transaction_runner.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/**
* @license
* Copyright 2019 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Deferred } from '../util/promise';
import { TimerId, AsyncQueue } from '../util/async_queue';
import { ExponentialBackoff } from '../remote/backoff';
import { Transaction } from './transaction';
import { RemoteStore } from '../remote/remote_store';
import { isNullOrUndefined } from '../util/types';
import { isPermanentError } from '../remote/rpc_error';
import { FirestoreError } from '../util/error';

const RETRY_COUNT = 5;

/**
* TransactionRunner encapsulates the logic needed to run and retry transactions
* with backoff.
*/
export class TransactionRunner<T> {
private retries = RETRY_COUNT;
private backoff: ExponentialBackoff;

constructor(
private readonly asyncQueue: AsyncQueue,
private readonly remoteStore: RemoteStore,
private readonly updateFunction: (transaction: Transaction) => Promise<T>,
private readonly deferred: Deferred<T>
) {
this.backoff = new ExponentialBackoff(
this.asyncQueue,
TimerId.RetryTransaction
);
}

/** Runs the transaction and sets the result on deferred. */
run(): void {
this.runWithBackOff();
}

private runWithBackOff(): void {
this.backoff.backoffAndRun(async () => {
const transaction = this.remoteStore.createTransaction();
const userPromise = this.tryRunUpdateFunction(transaction);
if (userPromise) {
userPromise
.then(result => {
this.asyncQueue.enqueueAndForget(() => {
return transaction
.commit()
.then(() => {
this.deferred.resolve(result);
})
.catch(commitError => {
this.handleTransactionError(commitError);
});
});
})
.catch(userPromiseError => {
this.handleTransactionError(userPromiseError);
});
}
});
}

private tryRunUpdateFunction(transaction: Transaction): Promise<T> | null {
try {
const userPromise = this.updateFunction(transaction);
if (
isNullOrUndefined(userPromise) ||
!userPromise.catch ||
!userPromise.then
) {
this.deferred.reject(
Error('Transaction callback must return a Promise')
);
return null;
}
return userPromise;
} catch (error) {
// Do not retry errors thrown by user provided updateFunction.
this.deferred.reject(error);
return null;
}
}

private handleTransactionError(error: Error): void {
if (this.retries > 0 && this.isRetryableTransactionError(error)) {
this.retries -= 1;
this.asyncQueue.enqueueAndForget(() => {
this.runWithBackOff();
return Promise.resolve();
});
} else {
this.deferred.reject(error);
}
}

private isRetryableTransactionError(error: Error): boolean {
if (error.name === 'FirebaseError') {
// In transactions, the backend will fail outdated reads with FAILED_PRECONDITION and
// non-matching document versions with ABORTED. These errors should be retried.
const code = (error as FirestoreError).code;
return (
code === 'aborted' ||
code === 'failed-precondition' ||
!isPermanentError(code)
);
}
return false;
}
}
17 changes: 14 additions & 3 deletions packages/firestore/src/remote/backoff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,17 @@ import * as log from '../util/log';
import { CancelablePromise } from '../util/promise';
const LOG_TAG = 'ExponentialBackoff';

/**
* Initial backoff time in milliseconds after an error.
* Set to 1s according to https://cloud.google.com/apis/design/errors.
*/
const DEFAULT_BACKOFF_INITIAL_DELAY_MS = 1000;

const DEFAULT_BACKOFF_FACTOR = 1.5;

/** Maximum backoff time in milliseconds */
const DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000;

/**
* A helper for running delayed tasks following an exponential backoff curve
* between attempts.
Expand Down Expand Up @@ -49,18 +60,18 @@ export class ExponentialBackoff {
* Note that jitter will still be applied, so the actual delay could be as
* little as 0.5*initialDelayMs.
*/
private readonly initialDelayMs: number,
private readonly initialDelayMs: number = DEFAULT_BACKOFF_INITIAL_DELAY_MS,
/**
* The multiplier to use to determine the extended base delay after each
* attempt.
*/
private readonly backoffFactor: number,
private readonly backoffFactor: number = DEFAULT_BACKOFF_FACTOR,
/**
* The maximum base delay after which no further backoff is performed.
* Note that jitter will still be applied, so the actual delay could be as
* much as 1.5*maxDelayMs.
*/
private readonly maxDelayMs: number
private readonly maxDelayMs: number = DEFAULT_BACKOFF_MAX_DELAY_MS
) {
this.reset();
}
Expand Down
19 changes: 1 addition & 18 deletions packages/firestore/src/remote/persistent_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,6 @@ export interface PersistentStreamListener {
onClose: (err?: FirestoreError) => Promise<void>;
}

/**
* Initial backoff time in milliseconds after an error.
* Set to 1s according to https://cloud.google.com/apis/design/errors.
*/
const BACKOFF_INITIAL_DELAY_MS = 1000;

/** Maximum backoff time in milliseconds */
const BACKOFF_MAX_DELAY_MS = 60 * 1000;

const BACKOFF_FACTOR = 1.5;

/** The time a stream stays open after it is marked idle. */
const IDLE_TIMEOUT_MS = 60 * 1000;

Expand Down Expand Up @@ -188,13 +177,7 @@ export abstract class PersistentStream<
private credentialsProvider: CredentialsProvider,
protected listener: ListenerType
) {
this.backoff = new ExponentialBackoff(
queue,
connectionTimerId,
BACKOFF_INITIAL_DELAY_MS,
BACKOFF_FACTOR,
BACKOFF_MAX_DELAY_MS
);
this.backoff = new ExponentialBackoff(queue, connectionTimerId);
}

/**
Expand Down
30 changes: 22 additions & 8 deletions packages/firestore/src/util/async_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,13 @@ export enum TimerId {
ClientMetadataRefresh = 'client_metadata_refresh',

/** A timer used to periodically attempt LRU Garbage collection */
LruGarbageCollection = 'lru_garbage_collection'
LruGarbageCollection = 'lru_garbage_collection',

/**
* A timer used to retry transactions. Since there can be multiple concurrent
* transactions, multiple of these may be in the queue at a given time.
*/
RetryTransaction = 'retry_transaction'
}

/**
Expand Down Expand Up @@ -203,6 +209,9 @@ export class AsyncQueue {
// assertion sanity-checks.
private operationInProgress = false;

// List of TimerIds to fast-forward delays for.
private timerIdsToSkip: TimerId[] = [];

// Is this AsyncQueue being shut down? If true, this instance will not enqueue
// any new operations, Promises from enqueue requests will not resolve.
get isShuttingDown(): boolean {
Expand Down Expand Up @@ -319,13 +328,6 @@ export class AsyncQueue {
`Attempted to schedule an operation with a negative delay of ${delayMs}`
);

// While not necessarily harmful, we currently don't expect to have multiple
// ops with the same timer id in the queue, so defensively reject them.
assert(
!this.containsDelayedOperation(timerId),
`Attempted to schedule multiple operations with timer id ${timerId}.`
);

const delayedOp = DelayedOperation.createAndSchedule<T>(
this,
timerId,
Expand All @@ -336,6 +338,11 @@ export class AsyncQueue {
);
this.delayedOperations.push(delayedOp as DelayedOperation<unknown>);

// Fast-forward delays for timerIds that have been overriden.
if (this.timerIdsToSkip.indexOf(delayedOp.timerId) > -1) {
delayedOp.skipDelay();
}

return delayedOp;
}

Expand Down Expand Up @@ -414,6 +421,13 @@ export class AsyncQueue {
});
}

/**
* For Tests: Skip all subsequent delays for a timer id.
*/
skipDelaysForTimerId(timerId: TimerId): void {
this.timerIdsToSkip.push(timerId);
}

/** Called once a DelayedOperation is run or canceled. */
private removeDelayedOperation(op: DelayedOperation<unknown>): void {
// NOTE: indexOf / slice are O(n), but delayedOperations is expected to be small.
Expand Down
Loading