Skip to content

Commit e4a3644

Browse files
Realize Android Tasks are slow
1 parent 4fa2fef commit e4a3644

File tree

4 files changed

+259
-64
lines changed

4 files changed

+259
-64
lines changed

firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteRemoteDocumentCache.java

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,23 +17,21 @@
1717
import static com.google.firebase.firestore.util.Assert.fail;
1818
import static com.google.firebase.firestore.util.Assert.hardAssert;
1919

20-
import com.google.android.gms.tasks.Task;
21-
import com.google.android.gms.tasks.Tasks;
2220
import com.google.firebase.database.collection.ImmutableSortedMap;
2321
import com.google.firebase.firestore.core.Query;
2422
import com.google.firebase.firestore.model.Document;
2523
import com.google.firebase.firestore.model.DocumentCollections;
2624
import com.google.firebase.firestore.model.DocumentKey;
2725
import com.google.firebase.firestore.model.MaybeDocument;
2826
import com.google.firebase.firestore.model.ResourcePath;
29-
import com.google.firebase.firestore.util.Executors;
30-
import com.google.firebase.firestore.util.Util;
27+
import com.google.firebase.firestore.util.TaskQueue;
3128
import com.google.protobuf.InvalidProtocolBufferException;
3229
import com.google.protobuf.MessageLite;
3330
import java.util.ArrayList;
3431
import java.util.HashMap;
3532
import java.util.List;
3633
import java.util.Map;
34+
import java.util.concurrent.ExecutionException;
3735
import javax.annotation.Nullable;
3836

3937
final class SQLiteRemoteDocumentCache implements RemoteDocumentCache {
@@ -123,7 +121,7 @@ public ImmutableSortedMap<DocumentKey, Document> getAllDocumentsMatchingQuery(Qu
123121
String prefixPath = EncodedPath.encode(prefix);
124122
String prefixSuccessorPath = EncodedPath.prefixSuccessor(prefixPath);
125123

126-
List<Task<Document>> pendingTasks = new ArrayList<>();
124+
TaskQueue<Document> taskQueue = new TaskQueue<>();
127125

128126
db.query("SELECT path, contents FROM remote_documents WHERE path >= ? AND path < ?")
129127
.binding(prefixPath, prefixSuccessorPath)
@@ -141,40 +139,48 @@ public ImmutableSortedMap<DocumentKey, Document> getAllDocumentsMatchingQuery(Qu
141139
return;
142140
}
143141

144-
byte[] rawContents = row.getBlob(1);
145-
146-
pendingTasks.add(
147-
Tasks.call(
148-
// Since scheduling background tasks incurs overhead, we only dispatch to a
149-
// background thread if there are still some documents remaining.
150-
row.isLast() ? Executors.DIRECT_EXECUTOR : Executors.BACKGROUND_EXECUTOR,
151-
() -> {
152-
MaybeDocument maybeDoc = decodeMaybeDocument(rawContents);
153-
if (!(maybeDoc instanceof Document)) {
154-
return null;
155-
}
156-
if (!query.matches((Document) maybeDoc)) {
157-
return null;
158-
}
159-
return (Document) maybeDoc;
160-
}));
142+
byte[] rawDocument = row.getBlob(1);
143+
144+
if (!row.isLast()) {
145+
taskQueue.enqueueInBackground(() -> this.processDocumentResult(rawDocument, query));
146+
} else {
147+
// Enqueue the last document inline so that we don't block on background execution.
148+
taskQueue.enqueueInline(() -> this.processDocumentResult(rawDocument, query));
149+
}
161150
});
162151

163152
ImmutableSortedMap<DocumentKey, Document> matchingDocuments =
164153
DocumentCollections.emptyDocumentMap();
165-
List<Document> results = Util.await(Util.whenAllComplete(pendingTasks));
166-
for (Document doc : results) {
167-
if (doc != null) {
168-
matchingDocuments = matchingDocuments.insert(doc.getKey(), doc);
154+
155+
try {
156+
List<Document> results = taskQueue.awaitResults();
157+
for (Document doc : results) {
158+
if (doc != null) {
159+
matchingDocuments = matchingDocuments.insert(doc.getKey(), doc);
160+
}
169161
}
162+
} catch (ExecutionException e) {
163+
fail(e, "Failed to decode documents from SQLite");
170164
}
165+
171166
return matchingDocuments;
172167
}
173168

174169
private String pathForKey(DocumentKey key) {
175170
return EncodedPath.encode(key.getPath());
176171
}
177172

173+
private @Nullable Document processDocumentResult(byte[] rawDocument, Query query) {
174+
MaybeDocument maybeDoc = decodeMaybeDocument(rawDocument);
175+
if (!(maybeDoc instanceof Document)) {
176+
return null;
177+
}
178+
if (!query.matches((Document) maybeDoc)) {
179+
return null;
180+
}
181+
return (Document) maybeDoc;
182+
}
183+
178184
private MaybeDocument decodeMaybeDocument(byte[] bytes) {
179185
try {
180186
return serializer.decodeMaybeDocument(
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
// Copyright 2019 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.firebase.firestore.util;
16+
17+
import static com.google.firebase.firestore.util.Assert.hardAssert;
18+
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.concurrent.BlockingQueue;
22+
import java.util.concurrent.Callable;
23+
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.PriorityBlockingQueue;
25+
import java.util.concurrent.Semaphore;
26+
27+
/**
28+
* A queue for parallel execution of independent tasks.
29+
*
30+
* <p>All tasks must return the same result type T and results can be be fetched via
31+
* `awaitResults()`. It is an error to enqueue tasks after invoking `awaitResults()`.
32+
*
33+
* <p>This class is not thread-safe. All public methods must be called from the same thread.
34+
*/
35+
public class TaskQueue<T> {
36+
private BlockingQueue<TaskResult<T>> taskResults = new PriorityBlockingQueue<>();
37+
private Semaphore completedTasks = new Semaphore(0);
38+
private int totalTaskCount = 0;
39+
private boolean acceptsTasks = true;
40+
41+
/** A single result from a background task. Encapsulates the task number to allow for sorting. */
42+
private static class TaskResult<T> implements Comparable<TaskResult<T>> {
43+
final int numTask;
44+
final T result;
45+
final Exception exception;
46+
47+
private TaskResult(int numTask, T result) {
48+
this.numTask = numTask;
49+
this.result = result;
50+
this.exception = null;
51+
}
52+
53+
private TaskResult(int numTask, Exception exception) {
54+
this.numTask = numTask;
55+
this.result = null;
56+
this.exception = exception;
57+
}
58+
59+
@Override
60+
public int compareTo(TaskResult<T> other) {
61+
return Integer.compare(numTask, other.numTask);
62+
}
63+
}
64+
65+
/** Enqueue a task on Android's THREAD_POOL_EXECUTOR. */
66+
public void enqueueInBackground(Callable<T> task) {
67+
hardAssert(acceptsTasks, "enqueueInBackground() called after awaitResults()");
68+
69+
int currentTask = ++totalTaskCount;
70+
Executors.BACKGROUND_EXECUTOR.execute(
71+
() -> {
72+
try {
73+
T result = task.call();
74+
taskResults.add(new TaskResult<>(currentTask, result));
75+
} catch (Exception e) {
76+
taskResults.add(new TaskResult<>(currentTask, e));
77+
}
78+
completedTasks.release();
79+
});
80+
}
81+
82+
/** Executes a task inline, allowing for more efficient execution of small tasks. */
83+
public void enqueueInline(Callable<T> callable) {
84+
hardAssert(acceptsTasks, "enqueueInBackground() called after awaitResults()");
85+
86+
int currentTask = ++totalTaskCount;
87+
try {
88+
T result = callable.call();
89+
taskResults.add(new TaskResult<>(currentTask, result));
90+
} catch (Exception e) {
91+
taskResults.add(new TaskResult<>(currentTask, e));
92+
}
93+
completedTasks.release();
94+
}
95+
96+
/**
97+
* Blocks on the execution of all tasks. Returns a list of results in the order that tasks where
98+
* added.
99+
*/
100+
public List<T> awaitResults() throws ExecutionException {
101+
acceptsTasks = false;
102+
103+
List<T> allResults = new ArrayList<>(totalTaskCount);
104+
try {
105+
completedTasks.acquire(totalTaskCount);
106+
while (!taskResults.isEmpty()) {
107+
TaskResult<T> currentResult = taskResults.take();
108+
109+
if (currentResult.exception != null) {
110+
throw new ExecutionException("Unhandled exception in task", currentResult.exception);
111+
} else {
112+
allResults.add(currentResult.result);
113+
}
114+
}
115+
} catch (InterruptedException e) {
116+
Thread.currentThread().interrupt();
117+
}
118+
return allResults;
119+
}
120+
}

firebase-firestore/src/main/java/com/google/firebase/firestore/util/Util.java

Lines changed: 0 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
import android.os.Looper;
1919
import androidx.annotation.Nullable;
2020
import com.google.android.gms.tasks.Continuation;
21-
import com.google.android.gms.tasks.Task;
22-
import com.google.android.gms.tasks.Tasks;
2321
import com.google.cloud.datastore.core.number.NumberComparisonHelper;
2422
import com.google.firebase.firestore.FieldPath;
2523
import com.google.firebase.firestore.FirebaseFirestoreException;
@@ -33,7 +31,6 @@
3331
import java.util.Comparator;
3432
import java.util.List;
3533
import java.util.Random;
36-
import java.util.concurrent.Semaphore;
3734

3835
/** A utility class for Firestore */
3936
public class Util {
@@ -225,39 +222,4 @@ public static void crashMainThread(RuntimeException exception) {
225222
throw exception;
226223
});
227224
}
228-
229-
/**
230-
* Waits for the given Task to complete.
231-
*
232-
* <p>Similar to Tasks.await() but can be called from the main thread to support unit testing.
233-
*/
234-
public static <TResult> TResult await(Task<TResult> task) {
235-
Semaphore semaphore = new Semaphore(0);
236-
task.addOnCompleteListener(Executors.BACKGROUND_EXECUTOR, t -> semaphore.release());
237-
try {
238-
semaphore.acquire();
239-
} catch (InterruptedException e) {
240-
Thread.currentThread().interrupt();
241-
}
242-
return task.getResult();
243-
}
244-
245-
/**
246-
* Returns a Task with a list of Tasks that completes successfully when all of the specified Tasks
247-
* complete.
248-
*
249-
* <p>Similar to Tasks.whenAllComplete() but does not schedule the completion on the main thread.
250-
*/
251-
public static <TResult> Task<List<TResult>> whenAllComplete(List<Task<TResult>> tasks) {
252-
return Tasks.whenAll(tasks)
253-
.continueWithTask(
254-
Executors.BACKGROUND_EXECUTOR,
255-
t -> {
256-
List<TResult> results = new ArrayList<>();
257-
for (Task<TResult> task : tasks) {
258-
results.add(task.getResult());
259-
}
260-
return Tasks.forResult(results);
261-
});
262-
}
263225
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
// Copyright 2019 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.firebase.firestore.util;
16+
17+
import static org.junit.Assert.assertEquals;
18+
19+
import java.util.Arrays;
20+
import java.util.List;
21+
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.Semaphore;
23+
import org.junit.Test;
24+
import org.robolectric.annotation.Config;
25+
26+
@org.junit.runner.RunWith(org.robolectric.RobolectricTestRunner.class)
27+
@Config(manifest = Config.NONE)
28+
public class TaskQueueTest {
29+
30+
@Test
31+
public void canEnqueueInBackground() throws ExecutionException {
32+
TaskQueue<Integer> queue = new TaskQueue<>();
33+
queue.enqueueInBackground(() -> 1);
34+
queue.enqueueInBackground(() -> 2);
35+
queue.enqueueInBackground(() -> 3);
36+
List<Integer> result = queue.awaitResults();
37+
assertEquals(Arrays.asList(1, 2, 3), result);
38+
}
39+
40+
@Test
41+
public void canEnqueueInline() throws ExecutionException {
42+
TaskQueue<Integer> queue = new TaskQueue<>();
43+
queue.enqueueInline(() -> 1);
44+
queue.enqueueInline(() -> 2);
45+
queue.enqueueInline(() -> 3);
46+
List<Integer> result = queue.awaitResults();
47+
assertEquals(Arrays.asList(1, 2, 3), result);
48+
}
49+
50+
@Test(expected = ExecutionException.class)
51+
public void enqueueInBackgroundForwardsException() throws ExecutionException {
52+
TaskQueue<Integer> queue = new TaskQueue<>();
53+
queue.enqueueInBackground(() -> 1);
54+
queue.enqueueInBackground(
55+
() -> {
56+
throw new Exception("foo");
57+
});
58+
queue.enqueueInBackground(() -> 3);
59+
queue.awaitResults();
60+
}
61+
62+
@Test(expected = ExecutionException.class)
63+
public void enqueueInlineForwardsException() throws ExecutionException {
64+
TaskQueue<Integer> queue = new TaskQueue<>();
65+
queue.enqueueInline(() -> 1);
66+
queue.enqueueInline(
67+
() -> {
68+
throw new Exception("foo");
69+
});
70+
queue.enqueueInline(() -> 3);
71+
queue.awaitResults();
72+
}
73+
74+
@Test
75+
public void enqueuePreservesOrder() throws ExecutionException {
76+
Semaphore firstTaskCompleted = new Semaphore(0);
77+
Semaphore secondTaskCompleted = new Semaphore(0);
78+
Semaphore thirdTaskCompleted = new Semaphore(0);
79+
80+
TaskQueue<Integer> queue = new TaskQueue<>();
81+
queue.enqueueInBackground(
82+
() -> {
83+
thirdTaskCompleted.acquire();
84+
return 1;
85+
});
86+
queue.enqueueInBackground(
87+
() -> {
88+
secondTaskCompleted.acquire();
89+
thirdTaskCompleted.release();
90+
return 2;
91+
});
92+
queue.enqueueInBackground(
93+
() -> {
94+
firstTaskCompleted.release();
95+
return 3;
96+
});
97+
queue.enqueueInline(
98+
() -> {
99+
firstTaskCompleted.acquire();
100+
secondTaskCompleted.release();
101+
return 4;
102+
});
103+
104+
List<Integer> result = queue.awaitResults();
105+
assertEquals(Arrays.asList(1, 2, 3, 4), result);
106+
}
107+
}

0 commit comments

Comments
 (0)