Skip to content

Commit ff137cc

Browse files
authored
Make enqueue() return a Task<>. (#175)
* Make enqueue(Runnable) return Task. * Add enqueue(Callable) which returns Task. * Add enqueuAndForget(Runnable) which returns void. * Update usages to make use of new ability (more closely matching TypeScript).
1 parent 83fa490 commit ff137cc

File tree

6 files changed

+88
-66
lines changed

6 files changed

+88
-66
lines changed

firebase-firestore/src/androidTest/java/com/google/firebase/firestore/remote/StreamTest.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -89,9 +89,9 @@ public void onWriteResponse(
8989
/** Puts the write in a state that accepts mutations. */
9090
private StreamStatusCallback waitForOpenStream(AsyncQueue testQueue, WriteStream writeStream) {
9191
StreamStatusCallback callback = new StreamStatusCallback();
92-
testQueue.enqueue(() -> writeStream.start(callback));
92+
testQueue.enqueueAndForget(() -> writeStream.start(callback));
9393
waitFor(callback.openSemaphore);
94-
testQueue.enqueue(writeStream::writeHandshake);
94+
testQueue.enqueueAndForget(writeStream::writeHandshake);
9595
waitFor(callback.handshakeSemaphore);
9696
return callback;
9797
}
@@ -105,7 +105,7 @@ public void testWatchStreamStopBeforeHandshake() throws Exception {
105105
final WatchStream watchStream = datastore.createWatchStream();
106106
StreamStatusCallback streamCallback = new StreamStatusCallback() {};
107107

108-
testQueue.enqueue(() -> watchStream.start(streamCallback));
108+
testQueue.enqueueAndForget(() -> watchStream.start(streamCallback));
109109
waitFor(streamCallback.openSemaphore);
110110

111111
// Stop must not call watchStreamStreamDidClose because the full implementation of the delegate
@@ -137,21 +137,21 @@ public void onWriteResponse(
137137
super.onWriteResponse(commitVersion, mutationResults);
138138
}
139139
};
140-
testQueue.enqueue(() -> writeStream.start(streamCallback));
140+
testQueue.enqueueAndForget(() -> writeStream.start(streamCallback));
141141
waitFor(streamCallback.openSemaphore);
142142

143143
// Writing before the handshake should throw
144-
testQueue.enqueue(
144+
testQueue.enqueueAndForget(
145145
() -> {
146146
assertThrows(Throwable.class, () -> writeStream.writeMutations(mutations));
147147
});
148148

149149
// Handshake should always be called
150-
testQueue.enqueue(writeStream::writeHandshake);
150+
testQueue.enqueueAndForget(writeStream::writeHandshake);
151151
waitFor(streamCallback.handshakeSemaphore);
152152

153153
// Now writes should succeed
154-
testQueue.enqueue(() -> writeStream.writeMutations(mutations));
154+
testQueue.enqueueAndForget(() -> writeStream.writeMutations(mutations));
155155
waitFor(streamCallback.responseReceivedSemaphore);
156156

157157
testQueue.runSync(writeStream::stop);
@@ -167,7 +167,7 @@ public void testWriteStreamStopPartial() throws Exception {
167167
final WriteStream writeStream = datastore.createWriteStream();
168168

169169
StreamStatusCallback streamCallback = new StreamStatusCallback() {};
170-
testQueue.enqueue(() -> writeStream.start(streamCallback));
170+
testQueue.enqueueAndForget(() -> writeStream.start(streamCallback));
171171
waitFor(streamCallback.openSemaphore);
172172

173173
// Don't start the handshake
@@ -203,7 +203,7 @@ public void testStreamClosesWhenIdle() throws Exception {
203203
final WriteStream writeStream = datastore.createWriteStream();
204204
StreamStatusCallback callback = waitForOpenStream(testQueue, writeStream);
205205

206-
testQueue.enqueue(
206+
testQueue.enqueueAndForget(
207207
() -> {
208208
writeStream.markIdle();
209209
assertTrue(testQueue.containsDelayedTask(TimerId.WRITE_STREAM_IDLE));

firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ static FirebaseFirestore newInstance(
8080

8181
CredentialsProvider provider = new FirebaseAuthCredentialsProvider(app);
8282

83-
queue.enqueue(
83+
queue.enqueueAndForget(
8484
new Runnable() {
8585
@Override
8686
public void run() {

firebase-firestore/src/main/java/com/google/firebase/firestore/core/FirestoreClient.java

Lines changed: 24 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import com.google.firebase.firestore.local.SQLitePersistence;
2626
import com.google.firebase.firestore.model.Document;
2727
import com.google.firebase.firestore.model.DocumentKey;
28-
import com.google.firebase.firestore.model.MaybeDocument;
2928
import com.google.firebase.firestore.model.mutation.Mutation;
3029
import com.google.firebase.firestore.model.mutation.MutationBatchResult;
3130
import com.google.firebase.firestore.remote.Datastore;
@@ -73,14 +72,14 @@ public FirestoreClient(
7372
hardAssert(!firstUser.getTask().isComplete(), "Already fulfilled first user task");
7473
firstUser.setResult(user);
7574
} else {
76-
asyncQueue.enqueue(() -> syncEngine.handleUserChange(user));
75+
asyncQueue.enqueueAndForget(() -> syncEngine.handleUserChange(user));
7776
}
7877
});
7978

8079
// Defer initialization until we get the current user from the userChangeListener. This is
8180
// guaranteed to be synchronously dispatched onto our worker queue, so we will be initialized
8281
// before any subsequently queued work runs.
83-
asyncQueue.enqueue(
82+
asyncQueue.enqueueAndForget(
8483
() -> {
8584
try {
8685
// Block on initial user being available
@@ -93,74 +92,58 @@ public FirestoreClient(
9392
}
9493

9594
public Task<Void> disableNetwork() {
96-
final TaskCompletionSource<Void> source = new TaskCompletionSource<>();
97-
asyncQueue.enqueue(
98-
() -> {
99-
remoteStore.disableNetwork();
100-
source.setResult(null);
101-
});
102-
return source.getTask();
95+
return asyncQueue.enqueue(() -> remoteStore.disableNetwork());
10396
}
10497

10598
public Task<Void> enableNetwork() {
106-
final TaskCompletionSource<Void> source = new TaskCompletionSource<>();
107-
asyncQueue.enqueue(
108-
() -> {
109-
remoteStore.enableNetwork();
110-
source.setResult(null);
111-
});
112-
return source.getTask();
99+
return asyncQueue.enqueue(() -> remoteStore.enableNetwork());
113100
}
114101

115102
/** Shuts down this client, cancels all writes / listeners, and releases all resources. */
116103
public Task<Void> shutdown() {
117-
final TaskCompletionSource<Void> source = new TaskCompletionSource<>();
118104
credentialsProvider.removeUserChangeListener();
119-
asyncQueue.enqueue(
105+
return asyncQueue.enqueue(
120106
() -> {
121107
remoteStore.shutdown();
122108
persistence.shutdown();
123-
source.setResult(null);
124109
});
125-
return source.getTask();
126110
}
127111

128112
/** Starts listening to a query. */
129113
public QueryListener listen(
130114
Query query, ListenOptions options, EventListener<ViewSnapshot> listener) {
131115
QueryListener queryListener = new QueryListener(query, options, listener);
132-
asyncQueue.enqueue(() -> eventManager.addQueryListener(queryListener));
116+
asyncQueue.enqueueAndForget(() -> eventManager.addQueryListener(queryListener));
133117
return queryListener;
134118
}
135119

136120
/** Stops listening to a query previously listened to. */
137121
public void stopListening(QueryListener listener) {
138-
asyncQueue.enqueue(() -> eventManager.removeQueryListener(listener));
122+
asyncQueue.enqueueAndForget(() -> eventManager.removeQueryListener(listener));
139123
}
140124

141125
public Task<Document> getDocumentFromLocalCache(DocumentKey docKey) {
142-
final TaskCompletionSource<Document> source = new TaskCompletionSource<>();
143-
asyncQueue.enqueue(
144-
() -> {
145-
MaybeDocument maybeDoc = localStore.readDocument(docKey);
146-
if (maybeDoc instanceof Document) {
147-
source.setResult((Document) maybeDoc);
148-
} else {
149-
source.setException(
150-
new FirebaseFirestoreException(
126+
return asyncQueue
127+
.enqueue(
128+
() -> {
129+
return localStore.readDocument(docKey);
130+
})
131+
.continueWith(
132+
(maybeDoc) -> {
133+
if (maybeDoc.getResult() instanceof Document) {
134+
return (Document) maybeDoc.getResult();
135+
} else {
136+
throw new FirebaseFirestoreException(
151137
"Failed to get document from cache. (However, this document may exist on the "
152138
+ "server. Run again without setting source to CACHE to attempt "
153139
+ "to retrieve the document from the server.)",
154-
Code.UNAVAILABLE));
155-
}
156-
});
157-
158-
return source.getTask();
140+
Code.UNAVAILABLE);
141+
}
142+
});
159143
}
160144

161145
public Task<ViewSnapshot> getDocumentsFromLocalCache(Query query) {
162-
final TaskCompletionSource<ViewSnapshot> source = new TaskCompletionSource<>();
163-
asyncQueue.enqueue(
146+
return asyncQueue.enqueue(
164147
() -> {
165148
ImmutableSortedMap<DocumentKey, Document> docs = localStore.executeQuery(query);
166149

@@ -170,16 +153,14 @@ public Task<ViewSnapshot> getDocumentsFromLocalCache(Query query) {
170153
new ImmutableSortedSet<DocumentKey>(
171154
Collections.emptyList(), DocumentKey::compareTo));
172155
View.DocumentChanges viewDocChanges = view.computeDocChanges(docs);
173-
source.setResult(view.applyChanges(viewDocChanges).getSnapshot());
156+
return view.applyChanges(viewDocChanges).getSnapshot();
174157
});
175-
176-
return source.getTask();
177158
}
178159

179160
/** Writes mutations. The returned task will be notified when it's written to the backend. */
180161
public Task<Void> write(final List<Mutation> mutations) {
181162
final TaskCompletionSource<Void> source = new TaskCompletionSource<>();
182-
asyncQueue.enqueue(() -> syncEngine.writeMutations(mutations, source));
163+
asyncQueue.enqueueAndForget(() -> syncEngine.writeMutations(mutations, source));
183164
return source.getTask();
184165
}
185166

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/AbstractStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public void start(CallbackT listener) {
197197
state = State.Auth;
198198

199199
// Make sure onOpen call is asynchronously
200-
workerQueue.enqueue(
200+
workerQueue.enqueueAndForget(
201201
() -> {
202202
// The stream could have been stopped between now and scheduling the onOpen.
203203
// Alternatively, onError could have been called (and the listener removed) before

firebase-firestore/src/main/java/com/google/firebase/firestore/util/AsyncQueue.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.Semaphore;
2727
import java.util.concurrent.ThreadFactory;
2828
import java.util.concurrent.TimeUnit;
29+
import javax.annotation.CheckReturnValue;
2930

3031
/** A helper class that allows to schedule/queue Runnables on a single threaded background queue. */
3132
public class AsyncQueue {
@@ -256,18 +257,58 @@ public void verifyIsCurrentThread() {
256257
}
257258

258259
/**
259-
* Queue and run this task immediately after every other already queued task.
260+
* Queue and run this Callable task immediately after every other already queued task.
260261
*
261262
* @param task The task to run.
263+
* @return A Task object that is resolved after the enqueued operation has completed, with the
264+
* return value of the operation.
262265
*/
263-
public void enqueue(Runnable task) {
266+
@CheckReturnValue
267+
public <T> Task<T> enqueue(Callable<T> task) {
268+
final TaskCompletionSource<T> completionSource = new TaskCompletionSource<>();
264269
try {
265-
executor.execute(task);
270+
executor.execute(
271+
() -> {
272+
try {
273+
completionSource.setResult(task.call());
274+
} catch (Exception e) {
275+
completionSource.setException(e);
276+
throw new RuntimeException(e);
277+
}
278+
});
266279
} catch (RejectedExecutionException e) {
267280
// The only way we can get here is if the AsyncQueue has panicked and we're now racing with
268281
// the post to the main looper that will crash the app.
269282
Logger.warn(AsyncQueue.class.getSimpleName(), "Refused to enqueue task after panic");
270283
}
284+
return completionSource.getTask();
285+
}
286+
287+
/**
288+
* Queue and run this Runnable task immediately after every other already queued task.
289+
*
290+
* @param task The task to run.
291+
* @return A Task object that is resolved after the enqueued operation has completed.
292+
*/
293+
@CheckReturnValue
294+
public Task<Void> enqueue(Runnable task) {
295+
return enqueue(
296+
() -> {
297+
task.run();
298+
return null;
299+
});
300+
}
301+
302+
/**
303+
* Queue and run this Runnable task immediately after every other already queued task. Unlike
304+
* enqueue(), returns void instead of a Task Object for use when we have no need to "wait" on the
305+
* task completing.
306+
*
307+
* @param task The task to run.
308+
*/
309+
@SuppressWarnings("CheckReturnValue")
310+
public void enqueueAndForget(Runnable task) {
311+
enqueue(task);
271312
}
272313

273314
/**
@@ -331,7 +372,7 @@ public void panic(Throwable t) {
331372
@VisibleForTesting
332373
public void runSync(Runnable task) throws InterruptedException {
333374
Semaphore done = new Semaphore(0);
334-
enqueue(
375+
enqueueAndForget(
335376
() -> {
336377
task.run();
337378
done.release();

firebase-firestore/src/test/java/com/google/firebase/firestore/util/AsyncQueueTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,9 @@ private void waitForExpectedSteps() {
6262
@Test
6363
public void canScheduleTasksInTheFuture() {
6464
expectedSteps = Arrays.asList(1, 2, 3, 4);
65-
queue.enqueue(runnableForStep(1));
65+
queue.enqueueAndForget(runnableForStep(1));
6666
queue.enqueueAfterDelay(TIMER_ID_1, 5, runnableForStep(4));
67-
queue.enqueue(runnableForStep(2));
67+
queue.enqueueAndForget(runnableForStep(2));
6868
queue.enqueueAfterDelay(TIMER_ID_2, 1, runnableForStep(3));
6969
waitForExpectedSteps();
7070
}
@@ -73,9 +73,9 @@ public void canScheduleTasksInTheFuture() {
7373
public void canCancelDelayedTasks() {
7474
expectedSteps = Arrays.asList(1, 3);
7575
// Queue everything from the queue to ensure nothing completes before we cancel.
76-
queue.enqueue(
76+
queue.enqueueAndForget(
7777
() -> {
78-
queue.enqueue(runnableForStep(1));
78+
queue.enqueueAndForget(runnableForStep(1));
7979
DelayedTask step2Timer = queue.enqueueAfterDelay(TIMER_ID_1, 1, runnableForStep(2));
8080
queue.enqueueAfterDelay(TIMER_ID_3, 5, runnableForStep(3));
8181

@@ -89,22 +89,22 @@ public void canCancelDelayedTasks() {
8989

9090
@Test
9191
public void canManuallyDrainAllDelayedTasksForTesting() throws Exception {
92-
queue.enqueue(runnableForStep(1));
92+
queue.enqueueAndForget(runnableForStep(1));
9393
queue.enqueueAfterDelay(TIMER_ID_1, 20, runnableForStep(4));
9494
queue.enqueueAfterDelay(TIMER_ID_2, 10, runnableForStep(3));
95-
queue.enqueue(runnableForStep(2));
95+
queue.enqueueAndForget(runnableForStep(2));
9696

9797
queue.runDelayedTasksUntil(TimerId.ALL);
9898
assertEquals(Arrays.asList(1, 2, 3, 4), completedSteps);
9999
}
100100

101101
@Test
102102
public void canManuallyDrainSpecificDelayedTasksForTesting() throws Exception {
103-
queue.enqueue(runnableForStep(1));
103+
queue.enqueueAndForget(runnableForStep(1));
104104
queue.enqueueAfterDelay(TIMER_ID_1, 20, runnableForStep(5));
105105
queue.enqueueAfterDelay(TIMER_ID_2, 10, runnableForStep(3));
106106
queue.enqueueAfterDelay(TIMER_ID_3, 15, runnableForStep(4));
107-
queue.enqueue(runnableForStep(2));
107+
queue.enqueueAndForget(runnableForStep(2));
108108

109109
queue.runDelayedTasksUntil(TIMER_ID_3);
110110
assertEquals(Arrays.asList(1, 2, 3, 4), completedSteps);

0 commit comments

Comments
 (0)