Skip to content

Commit a90317c

Browse files
author
Brian Chen
committed
Added TransactionRunner
1 parent 8e3bc3d commit a90317c

File tree

7 files changed

+156
-104
lines changed

7 files changed

+156
-104
lines changed

firebase-firestore/src/androidTest/java/com/google/firebase/firestore/TransactionTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.google.android.gms.tasks.Tasks;
3131
import com.google.firebase.firestore.FirebaseFirestoreException.Code;
3232
import com.google.firebase.firestore.testutil.IntegrationTestUtil;
33+
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
3334
import java.util.ArrayList;
3435
import java.util.Arrays;
3536
import java.util.List;
@@ -313,7 +314,7 @@ public void testIncrementTransactionally() {
313314
AtomicInteger started = new AtomicInteger(0);
314315

315316
FirebaseFirestore firestore = testFirestore();
316-
firestore.removeTransactionBackoffs();
317+
firestore.getAsyncQueue().skipDelaysForTimerId(TimerId.RETRY_TRANSACTION);
317318
DocumentReference doc = firestore.collection("counters").document();
318319
waitFor(doc.set(map("count", 5.0)));
319320

@@ -379,7 +380,7 @@ public void testUpdateTransactionally() {
379380
AtomicInteger counter = new AtomicInteger(0);
380381

381382
FirebaseFirestore firestore = testFirestore();
382-
firestore.removeTransactionBackoffs();
383+
firestore.getAsyncQueue().skipDelaysForTimerId(TimerId.RETRY_TRANSACTION);
383384
DocumentReference doc = firestore.collection("counters").document();
384385
waitFor(doc.set(map("count", 5.0, "other", "yes")));
385386

@@ -474,7 +475,7 @@ public void testUpdatePOJOTransactionally() {
474475
AtomicInteger started = new AtomicInteger(0);
475476

476477
FirebaseFirestore firestore = testFirestore();
477-
firestore.removeTransactionBackoffs();
478+
firestore.getAsyncQueue().skipDelaysForTimerId(TimerId.RETRY_TRANSACTION);
478479
DocumentReference doc = firestore.collection("counters").document();
479480
waitFor(doc.set(new POJO(5.0, "no", "clean")));
480481

@@ -549,7 +550,7 @@ public void testHandleReadingOneDocAndWritingAnother() {
549550
@Test
550551
public void testReadingADocTwiceWithDifferentVersions() {
551552
FirebaseFirestore firestore = testFirestore();
552-
firestore.removeTransactionBackoffs();
553+
firestore.getAsyncQueue().skipDelaysForTimerId(TimerId.RETRY_TRANSACTION);
553554
DocumentReference doc = firestore.collection("counters").document();
554555
waitFor(doc.set(map("count", 15.0)));
555556
AtomicInteger counter = new AtomicInteger(0);

firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestore.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,6 @@ public interface InstanceRegistry {
7171
private FirebaseFirestoreSettings settings;
7272
private volatile FirestoreClient client;
7373

74-
private boolean skipTransactionBackoffs = false;
75-
7674
@NonNull
7775
public static FirebaseFirestore getInstance() {
7876
FirebaseApp app = FirebaseApp.getInstance();
@@ -281,7 +279,7 @@ private <ResultT> Task<ResultT> runTransaction(
281279
updateFunction.apply(
282280
new Transaction(internalTransaction, FirebaseFirestore.this)));
283281

284-
return client.transaction(wrappedUpdateFunction, 5, skipTransactionBackoffs);
282+
return client.transaction(wrappedUpdateFunction, 5);
285283
}
286284

287285
/**
@@ -387,12 +385,6 @@ AsyncQueue getAsyncQueue() {
387385
return asyncQueue;
388386
}
389387

390-
/** For tests: Skip all transaction backoffs to make integration tests complete faster. */
391-
@VisibleForTesting
392-
void removeTransactionBackoffs() {
393-
this.skipTransactionBackoffs = true;
394-
}
395-
396388
/**
397389
* Re-enables network usage for this instance after a prior call to {@link #disableNetwork()}.
398390
*

firebase-firestore/src/main/java/com/google/firebase/firestore/core/FirestoreClient.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@
5151
import com.google.firebase.firestore.remote.RemoteSerializer;
5252
import com.google.firebase.firestore.remote.RemoteStore;
5353
import com.google.firebase.firestore.util.AsyncQueue;
54-
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
55-
import com.google.firebase.firestore.util.ExponentialBackoff;
5654
import com.google.firebase.firestore.util.Logger;
5755
import io.grpc.Status;
5856
import java.util.Collections;
@@ -219,20 +217,10 @@ public Task<Void> write(final List<Mutation> mutations) {
219217

220218
/** Tries to execute the transaction in updateFunction up to retries times. */
221219
public <TResult> Task<TResult> transaction(
222-
Function<Transaction, Task<TResult>> updateFunction,
223-
int retries,
224-
boolean skippedTransactionBackoffs) {
220+
Function<Transaction, Task<TResult>> updateFunction, int retries) {
225221
this.verifyNotShutdown();
226-
ExponentialBackoff backoff;
227-
if (skippedTransactionBackoffs) {
228-
backoff = new ExponentialBackoff(asyncQueue, TimerId.RETRY_TRANSACTION, 1, 2, 10);
229-
} else {
230-
backoff = new ExponentialBackoff(asyncQueue, AsyncQueue.TimerId.RETRY_TRANSACTION);
231-
}
232-
final TaskCompletionSource<TResult> txTaskSource = new TaskCompletionSource<>();
233-
asyncQueue.enqueueAndForget(
234-
() -> syncEngine.transaction(asyncQueue, backoff, txTaskSource, updateFunction, retries));
235-
return txTaskSource.getTask();
222+
return AsyncQueue.callTask(
223+
asyncQueue.getExecutor(), () -> syncEngine.transaction(asyncQueue, updateFunction));
236224
}
237225

238226
/**

firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java

Lines changed: 7 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,8 @@
1717
import static com.google.firebase.firestore.util.Assert.fail;
1818
import static com.google.firebase.firestore.util.Assert.hardAssert;
1919

20-
import androidx.annotation.NonNull;
2120
import androidx.annotation.Nullable;
2221
import androidx.annotation.VisibleForTesting;
23-
import com.google.android.gms.tasks.OnCompleteListener;
2422
import com.google.android.gms.tasks.Task;
2523
import com.google.android.gms.tasks.TaskCompletionSource;
2624
import com.google.common.base.Function;
@@ -43,12 +41,10 @@
4341
import com.google.firebase.firestore.model.mutation.Mutation;
4442
import com.google.firebase.firestore.model.mutation.MutationBatch;
4543
import com.google.firebase.firestore.model.mutation.MutationBatchResult;
46-
import com.google.firebase.firestore.remote.Datastore;
4744
import com.google.firebase.firestore.remote.RemoteEvent;
4845
import com.google.firebase.firestore.remote.RemoteStore;
4946
import com.google.firebase.firestore.remote.TargetChange;
5047
import com.google.firebase.firestore.util.AsyncQueue;
51-
import com.google.firebase.firestore.util.ExponentialBackoff;
5248
import com.google.firebase.firestore.util.Logger;
5349
import com.google.firebase.firestore.util.Util;
5450
import io.grpc.Status;
@@ -264,57 +260,13 @@ private void addUserCallback(int batchId, TaskCompletionSource<Void> userTask) {
264260
*
265261
* <p>The Task returned is resolved when the transaction is fully committed.
266262
*/
267-
public <TResult> void transaction(
268-
AsyncQueue asyncQueue,
269-
ExponentialBackoff backoff,
270-
TaskCompletionSource<TResult> taskSource,
271-
Function<Transaction, Task<TResult>> updateFunction,
272-
int retries) {
273-
hardAssert(retries >= 0, "Got negative number of retries for transaction.");
274-
275-
backoff.backoffAndRun(
276-
() -> {
277-
final Transaction transaction = remoteStore.createTransaction();
278-
updateFunction
279-
.apply(transaction)
280-
.addOnCompleteListener(
281-
asyncQueue.getExecutor(),
282-
new OnCompleteListener<TResult>() {
283-
@Override
284-
public void onComplete(@NonNull Task<TResult> userTask) {
285-
if (!userTask.isSuccessful()) {
286-
if (retries > 0 && isRetryableTransactionError(userTask.getException())) {
287-
transaction(asyncQueue, backoff, taskSource, updateFunction, retries - 1);
288-
} else {
289-
taskSource.setException(userTask.getException());
290-
}
291-
} else {
292-
transaction
293-
.commit()
294-
.addOnCompleteListener(
295-
asyncQueue.getExecutor(),
296-
new OnCompleteListener<Void>() {
297-
@Override
298-
public void onComplete(@NonNull Task<Void> commitTask) {
299-
if (commitTask.isSuccessful()) {
300-
taskSource.setResult(userTask.getResult());
301-
} else if (retries > 0
302-
&& isRetryableTransactionError(commitTask.getException())) {
303-
transaction(
304-
asyncQueue,
305-
backoff,
306-
taskSource,
307-
updateFunction,
308-
retries - 1);
309-
} else {
310-
taskSource.setException(commitTask.getException());
311-
}
312-
}
313-
});
314-
}
315-
}
316-
});
317-
});
263+
public <TResult> Task<TResult> transaction(
264+
AsyncQueue asyncQueue, Function<Transaction, Task<TResult>> updateFunction) {
265+
TransactionRunner<TResult> runner =
266+
new TransactionRunner<>(asyncQueue, remoteStore, updateFunction, 5);
267+
268+
runner.runTransaction();
269+
return runner.getTask();
318270
}
319271

320272
/** Called by FirestoreClient to notify us of a new remote event. */
@@ -683,16 +635,4 @@ private boolean errorIsInteresting(Status error) {
683635

684636
return false;
685637
}
686-
687-
private boolean isRetryableTransactionError(Exception e) {
688-
if (e instanceof FirebaseFirestoreException) {
689-
// In transactions, the backend will fail outdated reads with FAILED_PRECONDITION and
690-
// non-matching document versions with ABORTED. These errors should be retried.
691-
FirebaseFirestoreException.Code code = ((FirebaseFirestoreException) e).getCode();
692-
return code == FirebaseFirestoreException.Code.ABORTED
693-
|| code == FirebaseFirestoreException.Code.FAILED_PRECONDITION
694-
|| !Datastore.isPermanentError(((FirebaseFirestoreException) e).getCode());
695-
}
696-
return false;
697-
}
698638
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// Copyright 2019 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.firebase.firestore.core;
16+
17+
import static com.google.firebase.firestore.util.Assert.hardAssert;
18+
19+
import androidx.annotation.NonNull;
20+
import com.google.android.gms.tasks.OnCompleteListener;
21+
import com.google.android.gms.tasks.Task;
22+
import com.google.android.gms.tasks.TaskCompletionSource;
23+
import com.google.common.base.Function;
24+
import com.google.firebase.firestore.FirebaseFirestoreException;
25+
import com.google.firebase.firestore.remote.Datastore;
26+
import com.google.firebase.firestore.remote.RemoteStore;
27+
import com.google.firebase.firestore.util.AsyncQueue;
28+
import com.google.firebase.firestore.util.AsyncQueue.TimerId;
29+
import com.google.firebase.firestore.util.ExponentialBackoff;
30+
31+
/**
32+
* TransactionRunner encapsulates the logic needed to run and retry transactions so that the caller
33+
* does not have to manage the backoff and retry count through recursive calls.
34+
*/
35+
public class TransactionRunner<TResult> {
36+
private AsyncQueue asyncQueue;
37+
private RemoteStore remoteStore;
38+
private Function<Transaction, Task<TResult>> updateFunction;
39+
private int retries;
40+
41+
private ExponentialBackoff backoff;
42+
private TaskCompletionSource<TResult> taskSource = new TaskCompletionSource<>();
43+
44+
public TransactionRunner(
45+
AsyncQueue asyncQueue,
46+
RemoteStore remoteStore,
47+
Function<Transaction, Task<TResult>> updateFunction,
48+
int retries) {
49+
hardAssert(retries >= 0, "Got negative number of retries for transaction.");
50+
51+
this.asyncQueue = asyncQueue;
52+
this.remoteStore = remoteStore;
53+
this.updateFunction = updateFunction;
54+
this.retries = retries;
55+
56+
backoff = new ExponentialBackoff(asyncQueue, TimerId.RETRY_TRANSACTION);
57+
}
58+
59+
/** Runs the transaction and sets the result in taskSource. */
60+
public void runTransaction() {
61+
backoff.backoffAndRun(
62+
() -> {
63+
final Transaction transaction = remoteStore.createTransaction();
64+
updateFunction
65+
.apply(transaction)
66+
.addOnCompleteListener(
67+
asyncQueue.getExecutor(),
68+
new OnCompleteListener<TResult>() {
69+
@Override
70+
public void onComplete(@NonNull Task<TResult> userTask) {
71+
if (!userTask.isSuccessful()) {
72+
if (retries > 0 && isRetryableTransactionError(userTask.getException())) {
73+
retries -= 1;
74+
runTransaction();
75+
} else {
76+
taskSource.setException(userTask.getException());
77+
}
78+
} else {
79+
transaction
80+
.commit()
81+
.addOnCompleteListener(
82+
asyncQueue.getExecutor(),
83+
new OnCompleteListener<Void>() {
84+
@Override
85+
public void onComplete(@NonNull Task<Void> commitTask) {
86+
if (commitTask.isSuccessful()) {
87+
taskSource.setResult(userTask.getResult());
88+
} else if (retries > 0
89+
&& isRetryableTransactionError(commitTask.getException())) {
90+
retries -= 1;
91+
runTransaction();
92+
} else {
93+
taskSource.setException(commitTask.getException());
94+
}
95+
}
96+
});
97+
}
98+
}
99+
});
100+
});
101+
}
102+
103+
/** Returns the result of the transaction after it has been run. */
104+
public Task<TResult> getTask() {
105+
return taskSource.getTask();
106+
}
107+
108+
private static boolean isRetryableTransactionError(Exception e) {
109+
if (e instanceof FirebaseFirestoreException) {
110+
// In transactions, the backend will fail outdated reads with FAILED_PRECONDITION and
111+
// non-matching document versions with ABORTED. These errors should be retried.
112+
FirebaseFirestoreException.Code code = ((FirebaseFirestoreException) e).getCode();
113+
return code == FirebaseFirestoreException.Code.ABORTED
114+
|| code == FirebaseFirestoreException.Code.FAILED_PRECONDITION
115+
|| !Datastore.isPermanentError(((FirebaseFirestoreException) e).getCode());
116+
}
117+
return false;
118+
}
119+
}

firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,10 @@ public enum TimerId {
7070
ONLINE_STATE_TIMEOUT,
7171
/** A timer used to periodically attempt LRU Garbage collection */
7272
GARBAGE_COLLECTION,
73-
/** A timer used to retry transactions */
73+
/**
74+
* A timer used to retry transactions. Since there can be multiple concurrent transactions,
75+
* multiple of these may be in the queue at a given time.
76+
*/
7477
RETRY_TRANSACTION
7578
}
7679

@@ -380,6 +383,8 @@ private void setCorePoolSize(int size) {
380383
// theoretical removal speed, except this list will always be small so ArrayList is fine.
381384
private final ArrayList<DelayedTask> delayedTasks;
382385

386+
private final ArrayList<TimerId> timerIdsToSkip = new ArrayList<>();
387+
383388
public AsyncQueue() {
384389
delayedTasks = new ArrayList<>();
385390
executor = new SynchronizedShutdownAwareExecutor();
@@ -476,9 +481,18 @@ public DelayedTask enqueueAfterDelay(TimerId timerId, long delayMs, Runnable tas
476481
DelayedTask delayedTask = createAndScheduleDelayedTask(timerId, delayMs, task);
477482
delayedTasks.add(delayedTask);
478483

484+
for (TimerId timerIdToSkip : timerIdsToSkip) {
485+
if (delayedTask.timerId == timerIdToSkip) {
486+
delayedTask.skipDelay();
487+
}
488+
}
479489
return delayedTask;
480490
}
481491

492+
public void skipDelaysForTimerId(TimerId timerId) {
493+
timerIdsToSkip.add(timerId);
494+
}
495+
482496
/**
483497
* Immediately stops running any scheduled tasks and causes a "panic" (through crashing the app).
484498
*

firebase-firestore/src/main/java/com/google/firebase/firestore/util/ExponentialBackoff.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,14 +77,12 @@ public ExponentialBackoff(
7777
}
7878

7979
public ExponentialBackoff(AsyncQueue queue, AsyncQueue.TimerId timerId) {
80-
this.queue = queue;
81-
this.timerId = timerId;
82-
this.initialDelayMs = DEFAULT_BACKOFF_INITIAL_DELAY_MS;
83-
this.backoffFactor = DEFAULT_BACKOFF_FACTOR;
84-
this.maxDelayMs = DEFAULT_BACKOFF_MAX_DELAY_MS;
85-
this.lastAttemptTime = new Date().getTime();
86-
87-
reset();
80+
this(
81+
queue,
82+
timerId,
83+
DEFAULT_BACKOFF_INITIAL_DELAY_MS,
84+
DEFAULT_BACKOFF_FACTOR,
85+
DEFAULT_BACKOFF_MAX_DELAY_MS);
8886
}
8987

9088
/**

0 commit comments

Comments
 (0)