Skip to content

Commit 137fb97

Browse files
author
Brian Chen
authored
Retrying transactions with backoff (#698)
1 parent bcf86b8 commit 137fb97

File tree

7 files changed

+165
-56
lines changed

7 files changed

+165
-56
lines changed

firebase-firestore/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
# Unreleased
2+
- [changed] Transactions now perform exponential backoff before retrying.
3+
This means transactions on highly contended documents are more likely to
4+
succeed.
25

36
# 21.0.0
47
- [changed] Transactions are now more flexible. Some sequences of operations

firebase-firestore/src/androidTest/java/com/google/firebase/firestore/TransactionTest.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.android.gms.tasks.Tasks;
3131
import com.google.firebase.firestore.FirebaseFirestoreException.Code;
3232
import com.google.firebase.firestore.testutil.IntegrationTestUtil;
33+
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
3334
import java.util.ArrayList;
3435
import java.util.Arrays;
3536
import java.util.List;
@@ -313,6 +314,7 @@ public void testIncrementTransactionally() {
313314
AtomicInteger started = new AtomicInteger(0);
314315

315316
FirebaseFirestore firestore = testFirestore();
317+
firestore.getAsyncQueue().skipDelaysForTimerId(TimerId.RETRY_TRANSACTION);
316318
DocumentReference doc = firestore.collection("counters").document();
317319
waitFor(doc.set(map("count", 5.0)));
318320

@@ -378,6 +380,7 @@ public void testUpdateTransactionally() {
378380
AtomicInteger counter = new AtomicInteger(0);
379381

380382
FirebaseFirestore firestore = testFirestore();
383+
firestore.getAsyncQueue().skipDelaysForTimerId(TimerId.RETRY_TRANSACTION);
381384
DocumentReference doc = firestore.collection("counters").document();
382385
waitFor(doc.set(map("count", 5.0, "other", "yes")));
383386

@@ -472,6 +475,7 @@ public void testUpdatePOJOTransactionally() {
472475
AtomicInteger started = new AtomicInteger(0);
473476

474477
FirebaseFirestore firestore = testFirestore();
478+
firestore.getAsyncQueue().skipDelaysForTimerId(TimerId.RETRY_TRANSACTION);
475479
DocumentReference doc = firestore.collection("counters").document();
476480
waitFor(doc.set(new POJO(5.0, "no", "clean")));
477481

@@ -546,6 +550,7 @@ public void testHandleReadingOneDocAndWritingAnother() {
546550
@Test
547551
public void testReadingADocTwiceWithDifferentVersions() {
548552
FirebaseFirestore firestore = testFirestore();
553+
firestore.getAsyncQueue().skipDelaysForTimerId(TimerId.RETRY_TRANSACTION);
549554
DocumentReference doc = firestore.collection("counters").document();
550555
waitFor(doc.set(map("count", 15.0)));
551556
AtomicInteger counter = new AtomicInteger(0);

firebase-firestore/src/main/java/com/google/firebase/firestore/core/FirestoreClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -219,8 +219,7 @@ public <TResult> Task<TResult> transaction(
219219
Function<Transaction, Task<TResult>> updateFunction, int retries) {
220220
this.verifyNotShutdown();
221221
return AsyncQueue.callTask(
222-
asyncQueue.getExecutor(),
223-
() -> syncEngine.transaction(asyncQueue, updateFunction, retries));
222+
asyncQueue.getExecutor(), () -> syncEngine.transaction(asyncQueue, updateFunction));
224223
}
225224

226225
/**

firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import androidx.annotation.VisibleForTesting;
2222
import com.google.android.gms.tasks.Task;
2323
import com.google.android.gms.tasks.TaskCompletionSource;
24-
import com.google.android.gms.tasks.Tasks;
2524
import com.google.common.base.Function;
2625
import com.google.common.collect.Lists;
2726
import com.google.firebase.database.collection.ImmutableSortedMap;
@@ -42,7 +41,6 @@
4241
import com.google.firebase.firestore.model.mutation.Mutation;
4342
import com.google.firebase.firestore.model.mutation.MutationBatch;
4443
import com.google.firebase.firestore.model.mutation.MutationBatchResult;
45-
import com.google.firebase.firestore.remote.Datastore;
4644
import com.google.firebase.firestore.remote.RemoteEvent;
4745
import com.google.firebase.firestore.remote.RemoteStore;
4846
import com.google.firebase.firestore.remote.TargetChange;
@@ -250,9 +248,10 @@ private void addUserCallback(int batchId, TaskCompletionSource<Void> userTask) {
250248
/**
251249
* Takes an updateFunction in which a set of reads and writes can be performed atomically. In the
252250
* updateFunction, the client can read and write values using the supplied transaction object.
253-
* After the updateFunction, all changes will be committed. If some other client has changed any
254-
* of the data referenced, then the updateFunction will be called again. If the updateFunction
255-
* still fails after the given number of retries, then the transaction will be rejected.
251+
* After the updateFunction, all changes will be committed. If a retryable error occurs (ex: some
252+
* other client has changed any of the data referenced), then the updateFunction will be called
253+
* again after a backoff. If the updateFunction still fails after all retries, then the
254+
* transaction will be rejected.
256255
*
257256
* <p>The transaction object passed to the updateFunction contains methods for accessing documents
258257
* and collections. Unlike other datastore access, data accessed with the transaction will not
@@ -262,35 +261,8 @@ private void addUserCallback(int batchId, TaskCompletionSource<Void> userTask) {
262261
* <p>The Task returned is resolved when the transaction is fully committed.
263262
*/
264263
public <TResult> Task<TResult> transaction(
265-
AsyncQueue asyncQueue, Function<Transaction, Task<TResult>> updateFunction, int retries) {
266-
hardAssert(retries >= 0, "Got negative number of retries for transaction.");
267-
final Transaction transaction = remoteStore.createTransaction();
268-
return updateFunction
269-
.apply(transaction)
270-
.continueWithTask(
271-
asyncQueue.getExecutor(),
272-
userTask -> {
273-
if (!userTask.isSuccessful()) {
274-
if (retries > 0 && isRetryableTransactionError(userTask.getException())) {
275-
return transaction(asyncQueue, updateFunction, retries - 1);
276-
}
277-
return userTask;
278-
}
279-
return transaction
280-
.commit()
281-
.continueWithTask(
282-
asyncQueue.getExecutor(),
283-
commitTask -> {
284-
if (commitTask.isSuccessful()) {
285-
return Tasks.forResult(userTask.getResult());
286-
}
287-
Exception e = commitTask.getException();
288-
if (retries > 0 && isRetryableTransactionError(e)) {
289-
return transaction(asyncQueue, updateFunction, retries - 1);
290-
}
291-
return Tasks.forException(e);
292-
});
293-
});
264+
AsyncQueue asyncQueue, Function<Transaction, Task<TResult>> updateFunction) {
265+
return new TransactionRunner<TResult>(asyncQueue, remoteStore, updateFunction).run();
294266
}
295267

296268
/** Called by FirestoreClient to notify us of a new remote event. */
@@ -660,16 +632,4 @@ private boolean errorIsInteresting(Status error) {
660632

661633
return false;
662634
}
663-
664-
private boolean isRetryableTransactionError(Exception e) {
665-
if (e instanceof FirebaseFirestoreException) {
666-
// In transactions, the backend will fail outdated reads with FAILED_PRECONDITION and
667-
// non-matching document versions with ABORTED. These errors should be retried.
668-
FirebaseFirestoreException.Code code = ((FirebaseFirestoreException) e).getCode();
669-
return code == FirebaseFirestoreException.Code.ABORTED
670-
|| code == FirebaseFirestoreException.Code.FAILED_PRECONDITION
671-
|| !Datastore.isPermanentError(((FirebaseFirestoreException) e).getCode());
672-
}
673-
return false;
674-
}
675635
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
// Copyright 2019 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.firebase.firestore.core;
16+
17+
import androidx.annotation.NonNull;
18+
import com.google.android.gms.tasks.Task;
19+
import com.google.android.gms.tasks.TaskCompletionSource;
20+
import com.google.common.base.Function;
21+
import com.google.firebase.firestore.FirebaseFirestoreException;
22+
import com.google.firebase.firestore.remote.Datastore;
23+
import com.google.firebase.firestore.remote.RemoteStore;
24+
import com.google.firebase.firestore.util.AsyncQueue;
25+
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
26+
import com.google.firebase.firestore.util.ExponentialBackoff;
27+
28+
/** TransactionRunner encapsulates the logic needed to run and retry transactions with backoff. */
29+
public class TransactionRunner<TResult> {
30+
private static final int RETRY_COUNT = 5;
31+
private AsyncQueue asyncQueue;
32+
private RemoteStore remoteStore;
33+
private Function<Transaction, Task<TResult>> updateFunction;
34+
private int retriesLeft;
35+
36+
private ExponentialBackoff backoff;
37+
private TaskCompletionSource<TResult> taskSource = new TaskCompletionSource<>();
38+
39+
public TransactionRunner(
40+
AsyncQueue asyncQueue,
41+
RemoteStore remoteStore,
42+
Function<Transaction, Task<TResult>> updateFunction) {
43+
44+
this.asyncQueue = asyncQueue;
45+
this.remoteStore = remoteStore;
46+
this.updateFunction = updateFunction;
47+
this.retriesLeft = RETRY_COUNT;
48+
49+
backoff = new ExponentialBackoff(asyncQueue, TimerId.RETRY_TRANSACTION);
50+
}
51+
52+
/** Runs the transaction and returns a Task containing the result. */
53+
public Task<TResult> run() {
54+
runWithBackoff();
55+
return taskSource.getTask();
56+
}
57+
58+
private void runWithBackoff() {
59+
backoff.backoffAndRun(
60+
() -> {
61+
final Transaction transaction = remoteStore.createTransaction();
62+
updateFunction
63+
.apply(transaction)
64+
.addOnCompleteListener(
65+
asyncQueue.getExecutor(),
66+
(@NonNull Task<TResult> userTask) -> {
67+
if (!userTask.isSuccessful()) {
68+
handleTransactionError(userTask);
69+
} else {
70+
transaction
71+
.commit()
72+
.addOnCompleteListener(
73+
asyncQueue.getExecutor(),
74+
(@NonNull Task<Void> commitTask) -> {
75+
if (commitTask.isSuccessful()) {
76+
taskSource.setResult(userTask.getResult());
77+
} else {
78+
handleTransactionError(commitTask);
79+
}
80+
});
81+
}
82+
});
83+
});
84+
}
85+
86+
private void handleTransactionError(Task task) {
87+
if (retriesLeft > 0 && isRetryableTransactionError(task.getException())) {
88+
retriesLeft -= 1;
89+
runWithBackoff();
90+
} else {
91+
taskSource.setException(task.getException());
92+
}
93+
}
94+
95+
private static boolean isRetryableTransactionError(Exception e) {
96+
if (e instanceof FirebaseFirestoreException) {
97+
// In transactions, the backend will fail outdated reads with FAILED_PRECONDITION and
98+
// non-matching document versions with ABORTED. These errors should be retried.
99+
FirebaseFirestoreException.Code code = ((FirebaseFirestoreException) e).getCode();
100+
return code == FirebaseFirestoreException.Code.ABORTED
101+
|| code == FirebaseFirestoreException.Code.FAILED_PRECONDITION
102+
|| !Datastore.isPermanentError(((FirebaseFirestoreException) e).getCode());
103+
}
104+
return false;
105+
}
106+
}

firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,12 @@ public enum TimerId {
6969
*/
7070
ONLINE_STATE_TIMEOUT,
7171
/** A timer used to periodically attempt LRU Garbage collection */
72-
GARBAGE_COLLECTION
72+
GARBAGE_COLLECTION,
73+
/**
74+
* A timer used to retry transactions. Since there can be multiple concurrent transactions,
75+
* multiple of these may be in the queue at a given time.
76+
*/
77+
RETRY_TRANSACTION
7378
}
7479

7580
/**
@@ -378,6 +383,9 @@ private void setCorePoolSize(int size) {
378383
// theoretical removal speed, except this list will always be small so ArrayList is fine.
379384
private final ArrayList<DelayedTask> delayedTasks;
380385

386+
// List of TimerIds to fast-forward delays for.
387+
private final ArrayList<TimerId> timerIdsToSkip = new ArrayList<>();
388+
381389
public AsyncQueue() {
382390
delayedTasks = new ArrayList<>();
383391
executor = new SynchronizedShutdownAwareExecutor();
@@ -471,19 +479,26 @@ public void enqueueAndForget(Runnable task) {
471479
* @return A DelayedTask instance that can be used for cancellation.
472480
*/
473481
public DelayedTask enqueueAfterDelay(TimerId timerId, long delayMs, Runnable task) {
474-
// While not necessarily harmful, we currently don't expect to have multiple tasks with the same
475-
// timer id in the queue, so defensively reject them.
476-
hardAssert(
477-
!containsDelayedTask(timerId),
478-
"Attempted to schedule multiple operations with timer id %s.",
479-
timerId);
480-
481482
DelayedTask delayedTask = createAndScheduleDelayedTask(timerId, delayMs, task);
482483
delayedTasks.add(delayedTask);
483484

485+
// Fast-forward delays for timerIds that have been overridden.
486+
if (timerIdsToSkip.contains(delayedTask.timerId)) {
487+
delayedTask.skipDelay();
488+
}
484489
return delayedTask;
485490
}
486491

492+
/**
493+
* For Tests: Skip all subsequent delays for a timer id.
494+
*
495+
* @param timerId The timerId to skip delays for.
496+
*/
497+
@VisibleForTesting
498+
public void skipDelaysForTimerId(TimerId timerId) {
499+
timerIdsToSkip.add(timerId);
500+
}
501+
487502
/**
488503
* Immediately stops running any scheduled tasks and causes a "panic" (through crashing the app).
489504
*

firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,18 @@
2020

2121
/** Helper to implement exponential backoff. */
2222
public class ExponentialBackoff {
23+
24+
/**
25+
* Initial backoff time in milliseconds after an error. Set to 1s according to
26+
* https://cloud.google.com/apis/design/errors.
27+
*/
28+
public static final long DEFAULT_BACKOFF_INITIAL_DELAY_MS = 1000;
29+
30+
public static final double DEFAULT_BACKOFF_FACTOR = 1.5;
31+
32+
/** Maximum backoff time in milliseconds */
33+
public static final long DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000;
34+
2335
private final AsyncQueue queue;
2436
private final TimerId timerId;
2537
private final long initialDelayMs;
@@ -64,6 +76,15 @@ public ExponentialBackoff(
6476
reset();
6577
}
6678

79+
public ExponentialBackoff(AsyncQueue queue, AsyncQueue.TimerId timerId) {
80+
this(
81+
queue,
82+
timerId,
83+
DEFAULT_BACKOFF_INITIAL_DELAY_MS,
84+
DEFAULT_BACKOFF_FACTOR,
85+
DEFAULT_BACKOFF_MAX_DELAY_MS);
86+
}
87+
6788
/**
6889
* Resets the backoff delay.
6990
*

0 commit comments

Comments
 (0)