Skip to content

Retrying transactions with backoff #698

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 11 commits into from
Aug 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions firebase-firestore/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Unreleased
- [changed] Transactions now perform exponential backoff before retrying.
This means transactions on highly contended documents are more likely to
succeed.

# 21.0.0
- [changed] Transactions are now more flexible. Some sequences of operations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.android.gms.tasks.Tasks;
import com.google.firebase.firestore.FirebaseFirestoreException.Code;
import com.google.firebase.firestore.testutil.IntegrationTestUtil;
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -313,6 +314,7 @@ public void testIncrementTransactionally() {
AtomicInteger started = new AtomicInteger(0);

FirebaseFirestore firestore = testFirestore();
firestore.getAsyncQueue().skipDelaysForTimerId(TimerId.RETRY_TRANSACTION);
DocumentReference doc = firestore.collection("counters").document();
waitFor(doc.set(map("count", 5.0)));

Expand Down Expand Up @@ -378,6 +380,7 @@ public void testUpdateTransactionally() {
AtomicInteger counter = new AtomicInteger(0);

FirebaseFirestore firestore = testFirestore();
firestore.getAsyncQueue().skipDelaysForTimerId(TimerId.RETRY_TRANSACTION);
DocumentReference doc = firestore.collection("counters").document();
waitFor(doc.set(map("count", 5.0, "other", "yes")));

Expand Down Expand Up @@ -472,6 +475,7 @@ public void testUpdatePOJOTransactionally() {
AtomicInteger started = new AtomicInteger(0);

FirebaseFirestore firestore = testFirestore();
firestore.getAsyncQueue().skipDelaysForTimerId(TimerId.RETRY_TRANSACTION);
DocumentReference doc = firestore.collection("counters").document();
waitFor(doc.set(new POJO(5.0, "no", "clean")));

Expand Down Expand Up @@ -546,6 +550,7 @@ public void testHandleReadingOneDocAndWritingAnother() {
@Test
public void testReadingADocTwiceWithDifferentVersions() {
FirebaseFirestore firestore = testFirestore();
firestore.getAsyncQueue().skipDelaysForTimerId(TimerId.RETRY_TRANSACTION);
DocumentReference doc = firestore.collection("counters").document();
waitFor(doc.set(map("count", 15.0)));
AtomicInteger counter = new AtomicInteger(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,7 @@ public <TResult> Task<TResult> transaction(
Function<Transaction, Task<TResult>> updateFunction, int retries) {
this.verifyNotShutdown();
return AsyncQueue.callTask(
asyncQueue.getExecutor(),
() -> syncEngine.transaction(asyncQueue, updateFunction, retries));
asyncQueue.getExecutor(), () -> syncEngine.transaction(asyncQueue, updateFunction));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import androidx.annotation.VisibleForTesting;
import com.google.android.gms.tasks.Task;
import com.google.android.gms.tasks.TaskCompletionSource;
import com.google.android.gms.tasks.Tasks;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.firebase.database.collection.ImmutableSortedMap;
Expand All @@ -42,7 +41,6 @@
import com.google.firebase.firestore.model.mutation.Mutation;
import com.google.firebase.firestore.model.mutation.MutationBatch;
import com.google.firebase.firestore.model.mutation.MutationBatchResult;
import com.google.firebase.firestore.remote.Datastore;
import com.google.firebase.firestore.remote.RemoteEvent;
import com.google.firebase.firestore.remote.RemoteStore;
import com.google.firebase.firestore.remote.TargetChange;
Expand Down Expand Up @@ -250,9 +248,10 @@ private void addUserCallback(int batchId, TaskCompletionSource<Void> userTask) {
/**
* Takes an updateFunction in which a set of reads and writes can be performed atomically. In the
* updateFunction, the client can read and write values using the supplied transaction object.
* After the updateFunction, all changes will be committed. If some other client has changed any
* of the data referenced, then the updateFunction will be called again. If the updateFunction
* still fails after the given number of retries, then the transaction will be rejected.
* After the updateFunction, all changes will be committed. If a retryable error occurs (ex: some
* other client has changed any of the data referenced), then the updateFunction will be called
* again after a backoff. If the updateFunction still fails after all retries, then the
* transaction will be rejected.
*
* <p>The transaction object passed to the updateFunction contains methods for accessing documents
* and collections. Unlike other datastore access, data accessed with the transaction will not
Expand All @@ -262,35 +261,8 @@ private void addUserCallback(int batchId, TaskCompletionSource<Void> userTask) {
* <p>The Task returned is resolved when the transaction is fully committed.
*/
public <TResult> Task<TResult> transaction(
AsyncQueue asyncQueue, Function<Transaction, Task<TResult>> updateFunction, int retries) {
hardAssert(retries >= 0, "Got negative number of retries for transaction.");
final Transaction transaction = remoteStore.createTransaction();
return updateFunction
.apply(transaction)
.continueWithTask(
asyncQueue.getExecutor(),
userTask -> {
if (!userTask.isSuccessful()) {
if (retries > 0 && isRetryableTransactionError(userTask.getException())) {
return transaction(asyncQueue, updateFunction, retries - 1);
}
return userTask;
}
return transaction
.commit()
.continueWithTask(
asyncQueue.getExecutor(),
commitTask -> {
if (commitTask.isSuccessful()) {
return Tasks.forResult(userTask.getResult());
}
Exception e = commitTask.getException();
if (retries > 0 && isRetryableTransactionError(e)) {
return transaction(asyncQueue, updateFunction, retries - 1);
}
return Tasks.forException(e);
});
});
AsyncQueue asyncQueue, Function<Transaction, Task<TResult>> updateFunction) {
return new TransactionRunner<TResult>(asyncQueue, remoteStore, updateFunction).run();
}

/** Called by FirestoreClient to notify us of a new remote event. */
Expand Down Expand Up @@ -659,16 +631,4 @@ private boolean errorIsInteresting(Status error) {

return false;
}

private boolean isRetryableTransactionError(Exception e) {
if (e instanceof FirebaseFirestoreException) {
// In transactions, the backend will fail outdated reads with FAILED_PRECONDITION and
// non-matching document versions with ABORTED. These errors should be retried.
FirebaseFirestoreException.Code code = ((FirebaseFirestoreException) e).getCode();
return code == FirebaseFirestoreException.Code.ABORTED
|| code == FirebaseFirestoreException.Code.FAILED_PRECONDITION
|| !Datastore.isPermanentError(((FirebaseFirestoreException) e).getCode());
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.google.firebase.firestore.core;

import androidx.annotation.NonNull;
import com.google.android.gms.tasks.Task;
import com.google.android.gms.tasks.TaskCompletionSource;
import com.google.common.base.Function;
import com.google.firebase.firestore.FirebaseFirestoreException;
import com.google.firebase.firestore.remote.Datastore;
import com.google.firebase.firestore.remote.RemoteStore;
import com.google.firebase.firestore.util.AsyncQueue;
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
import com.google.firebase.firestore.util.ExponentialBackoff;

/** TransactionRunner encapsulates the logic needed to run and retry transactions with backoff. */
public class TransactionRunner<TResult> {
private static final int RETRY_COUNT = 5;
private AsyncQueue asyncQueue;
private RemoteStore remoteStore;
private Function<Transaction, Task<TResult>> updateFunction;
private int retriesLeft;

private ExponentialBackoff backoff;
private TaskCompletionSource<TResult> taskSource = new TaskCompletionSource<>();

public TransactionRunner(
AsyncQueue asyncQueue,
RemoteStore remoteStore,
Function<Transaction, Task<TResult>> updateFunction) {

this.asyncQueue = asyncQueue;
this.remoteStore = remoteStore;
this.updateFunction = updateFunction;
this.retriesLeft = RETRY_COUNT;

backoff = new ExponentialBackoff(asyncQueue, TimerId.RETRY_TRANSACTION);
}

/** Runs the transaction and returns a Task containing the result. */
public Task<TResult> run() {
runWithBackoff();
return taskSource.getTask();
}

private void runWithBackoff() {
backoff.backoffAndRun(
() -> {
final Transaction transaction = remoteStore.createTransaction();
updateFunction
.apply(transaction)
.addOnCompleteListener(
asyncQueue.getExecutor(),
(@NonNull Task<TResult> userTask) -> {
if (!userTask.isSuccessful()) {
handleTransactionError(userTask);
} else {
transaction
.commit()
.addOnCompleteListener(
asyncQueue.getExecutor(),
(@NonNull Task<Void> commitTask) -> {
if (commitTask.isSuccessful()) {
taskSource.setResult(userTask.getResult());
} else {
handleTransactionError(commitTask);
}
});
}
});
});
}

private void handleTransactionError(Task task) {
if (retriesLeft > 0 && isRetryableTransactionError(task.getException())) {
retriesLeft -= 1;
runWithBackoff();
} else {
taskSource.setException(task.getException());
}
}

private static boolean isRetryableTransactionError(Exception e) {
if (e instanceof FirebaseFirestoreException) {
// In transactions, the backend will fail outdated reads with FAILED_PRECONDITION and
// non-matching document versions with ABORTED. These errors should be retried.
FirebaseFirestoreException.Code code = ((FirebaseFirestoreException) e).getCode();
return code == FirebaseFirestoreException.Code.ABORTED
|| code == FirebaseFirestoreException.Code.FAILED_PRECONDITION
|| !Datastore.isPermanentError(((FirebaseFirestoreException) e).getCode());
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,12 @@ public enum TimerId {
*/
ONLINE_STATE_TIMEOUT,
/** A timer used to periodically attempt LRU Garbage collection */
GARBAGE_COLLECTION
GARBAGE_COLLECTION,
/**
* A timer used to retry transactions. Since there can be multiple concurrent transactions,
* multiple of these may be in the queue at a given time.
*/
RETRY_TRANSACTION
}

/**
Expand Down Expand Up @@ -378,6 +383,9 @@ private void setCorePoolSize(int size) {
// theoretical removal speed, except this list will always be small so ArrayList is fine.
private final ArrayList<DelayedTask> delayedTasks;

// List of TimerIds to fast-forward delays for.
private final ArrayList<TimerId> timerIdsToSkip = new ArrayList<>();

public AsyncQueue() {
delayedTasks = new ArrayList<>();
executor = new SynchronizedShutdownAwareExecutor();
Expand Down Expand Up @@ -471,19 +479,26 @@ public void enqueueAndForget(Runnable task) {
* @return A DelayedTask instance that can be used for cancellation.
*/
public DelayedTask enqueueAfterDelay(TimerId timerId, long delayMs, Runnable task) {
// While not necessarily harmful, we currently don't expect to have multiple tasks with the same
// timer id in the queue, so defensively reject them.
hardAssert(
!containsDelayedTask(timerId),
"Attempted to schedule multiple operations with timer id %s.",
timerId);

DelayedTask delayedTask = createAndScheduleDelayedTask(timerId, delayMs, task);
delayedTasks.add(delayedTask);

// Fast-forward delays for timerIds that have been overridden.
if (timerIdsToSkip.contains(delayedTask.timerId)) {
delayedTask.skipDelay();
}
return delayedTask;
}

/**
* For Tests: Skip all subsequent delays for a timer id.
*
* @param timerId The timerId to skip delays for.
*/
@VisibleForTesting
public void skipDelaysForTimerId(TimerId timerId) {
timerIdsToSkip.add(timerId);
}

/**
* Immediately stops running any scheduled tasks and causes a "panic" (through crashing the app).
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,18 @@

/** Helper to implement exponential backoff. */
public class ExponentialBackoff {

/**
* Initial backoff time in milliseconds after an error. Set to 1s according to
* https://cloud.google.com/apis/design/errors.
*/
public static final long DEFAULT_BACKOFF_INITIAL_DELAY_MS = 1000;

public static final double DEFAULT_BACKOFF_FACTOR = 1.5;

/** Maximum backoff time in milliseconds */
public static final long DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000;

private final AsyncQueue queue;
private final TimerId timerId;
private final long initialDelayMs;
Expand Down Expand Up @@ -64,6 +76,15 @@ public ExponentialBackoff(
reset();
}

public ExponentialBackoff(AsyncQueue queue, AsyncQueue.TimerId timerId) {
this(
queue,
timerId,
DEFAULT_BACKOFF_INITIAL_DELAY_MS,
DEFAULT_BACKOFF_FACTOR,
DEFAULT_BACKOFF_MAX_DELAY_MS);
}

/**
* Resets the backoff delay.
*
Expand Down