Skip to content

Commit 74603b8

Browse files
committed
awaitPendingWrites initial revision
1 parent 75d0130 commit 74603b8

File tree

10 files changed

+138
-1
lines changed

10 files changed

+138
-1
lines changed

firebase-firestore/src/androidTest/java/com/google/firebase/firestore/FirestoreTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1096,4 +1096,25 @@ public void testShutdownCalledMultipleTimes() {
10961096

10971097
expectError(() -> waitFor(reference.get()), expectedMessage);
10981098
}
1099+
1100+
@Test
1101+
public void testAwaitPendingWritesResolves() {
1102+
DocumentReference documentReference = testCollection("abc").document("123");
1103+
FirebaseFirestore firestore = documentReference.getFirestore();
1104+
Map<String, Object> data = map("foo", "bar");
1105+
1106+
waitFor(firestore.disableNetwork());
1107+
Task<Void> awaitsPendingWrites1 = firestore.awaitPendingWrites();
1108+
Task<Void> pendingWrite = documentReference.set(data);
1109+
Task<Void> awaitsPendingWrites2 = firestore.awaitPendingWrites();
1110+
1111+
assertTrue(!awaitsPendingWrites1.isComplete());
1112+
assertTrue(!pendingWrite.isComplete());
1113+
assertTrue(!awaitsPendingWrites2.isComplete());
1114+
1115+
waitFor(firestore.enableNetwork());
1116+
waitFor(pendingWrite);
1117+
waitFor(awaitsPendingWrites1);
1118+
waitFor(awaitsPendingWrites2);
1119+
}
10991120
}

firebase-firestore/src/main/java/com/google/firebase/firestore/FirebaseFirestore.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,18 @@ Task<Void> shutdown() {
362362
return shutdownInternal();
363363
}
364364

365+
/**
366+
* Wait for server acknowledgement for all pending writes existing at the time of calling this
367+
* method.
368+
*
369+
* <p>Both acceptance and rejection count as server acknowledgement.
370+
*
371+
* @return A {@link Task} which resolves when all pending writes are acknowledged by the server.
372+
*/
373+
Task<Void> awaitPendingWrites() {
374+
return client.awaitPendingWrites();
375+
}
376+
365377
@VisibleForTesting
366378
AsyncQueue getAsyncQueue() {
367379
return asyncQueue;

firebase-firestore/src/main/java/com/google/firebase/firestore/core/FirestoreClient.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,24 @@ public <TResult> Task<TResult> transaction(
224224
() -> syncEngine.transaction(asyncQueue, updateFunction, retries));
225225
}
226226

227+
/**
228+
* Returns a task resolves when all the pending writes at the time when this method is called
229+
* received server acknowledgement. An acknowledgement can be either acceptance or rejections.
230+
*/
231+
public Task<Void> awaitPendingWrites() {
232+
this.verifyNotShutdown();
233+
if (!remoteStore.canUseNetwork()) {
234+
Logger.warn(
235+
LOG_TAG,
236+
"Network is disabled, the Task created to wait for all writes getting"
237+
+ " acknowledged by server will not complete until network is enabled.");
238+
}
239+
240+
final TaskCompletionSource<Void> source = new TaskCompletionSource<>();
241+
asyncQueue.enqueueAndForget(() -> syncEngine.registerPendingWritesTask(source));
242+
return source.getTask();
243+
}
244+
227245
private void initialize(Context context, User user, boolean usePersistence, long cacheSizeBytes) {
228246
// Note: The initialization work must all be synchronous (we can't dispatch more work) since
229247
// external write/listen operations could get queued to run before that subsequent work

firebase-firestore/src/main/java/com/google/firebase/firestore/core/SyncEngine.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.android.gms.tasks.TaskCompletionSource;
2424
import com.google.android.gms.tasks.Tasks;
2525
import com.google.common.base.Function;
26+
import com.google.common.collect.Lists;
2627
import com.google.firebase.database.collection.ImmutableSortedMap;
2728
import com.google.firebase.database.collection.ImmutableSortedSet;
2829
import com.google.firebase.firestore.FirebaseFirestoreException;
@@ -133,6 +134,9 @@ interface SyncEngineCallback {
133134
/** Stores user completion blocks, indexed by user and batch ID. */
134135
private final Map<User, Map<Integer, TaskCompletionSource<Void>>> mutationUserCallbacks;
135136

137+
/** Stores user callbacks waiting for all pending writes to be acknowledged. */
138+
private final Map<Integer, List<TaskCompletionSource<Void>>> pendingWritesCallbacks;
139+
136140
/** Used for creating the target IDs for the listens used to resolve limbo documents. */
137141
private final TargetIdGenerator targetIdGenerator;
138142

@@ -154,6 +158,8 @@ public SyncEngine(LocalStore localStore, RemoteStore remoteStore, User initialUs
154158
mutationUserCallbacks = new HashMap<>();
155159
targetIdGenerator = TargetIdGenerator.forSyncEngine();
156160
currentUser = initialUser;
161+
162+
pendingWritesCallbacks = new HashMap<>();
157163
}
158164

159165
public void setCallback(SyncEngineCallback callback) {
@@ -407,6 +413,8 @@ public void handleSuccessfulWrite(MutationBatchResult mutationBatchResult) {
407413
// they consistently happen before listen events.
408414
notifyUser(mutationBatchResult.getBatch().getBatchId(), /*status=*/ null);
409415

416+
resolveTasksAwaitingForPendingWritesIfAny(mutationBatchResult.getBatch().getBatchId());
417+
410418
ImmutableSortedMap<DocumentKey, MaybeDocument> changes =
411419
localStore.acknowledgeBatch(mutationBatchResult);
412420

@@ -427,9 +435,41 @@ public void handleRejectedWrite(int batchId, Status status) {
427435
// they consistently happen before listen events.
428436
notifyUser(batchId, status);
429437

438+
resolveTasksAwaitingForPendingWritesIfAny(batchId);
439+
430440
emitNewSnapsAndNotifyLocalStore(changes, /*remoteEvent=*/ null);
431441
}
432442

443+
/**
444+
* Takes a snapshot of current local mutation queue, and register a user task which will resolve
445+
* when all those mutations are either accepted or rejected by the server.
446+
*/
447+
public void registerPendingWritesTask(TaskCompletionSource<Void> userTask) {
448+
int largestPendingBatchId = localStore.getHighestUnacknowledgedBatchId();
449+
450+
if (largestPendingBatchId == 0) {
451+
// Complete the task right away if there is no pending writes at the moment.
452+
userTask.setResult(null);
453+
}
454+
455+
if (pendingWritesCallbacks.containsKey(largestPendingBatchId)) {
456+
pendingWritesCallbacks.get(largestPendingBatchId).add(userTask);
457+
} else {
458+
pendingWritesCallbacks.put(largestPendingBatchId, Lists.newArrayList(userTask));
459+
}
460+
}
461+
462+
/** Resolves tasks waiting for this batch id to get acknowledged by server, if there is any. */
463+
private void resolveTasksAwaitingForPendingWritesIfAny(int batchId) {
464+
if (pendingWritesCallbacks.containsKey(batchId)) {
465+
for (TaskCompletionSource<Void> task : pendingWritesCallbacks.get(batchId)) {
466+
task.setResult(null);
467+
}
468+
469+
pendingWritesCallbacks.remove(batchId);
470+
}
471+
}
472+
433473
/** Resolves the task corresponding to this write result. */
434474
private void notifyUser(int batchId, @Nullable Status status) {
435475
Map<Integer, TaskCompletionSource<Void>> userTasks = mutationUserCallbacks.get(currentUser);

firebase-firestore/src/main/java/com/google/firebase/firestore/local/LocalStore.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,14 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> rejectBatch(int batchId) {
282282
});
283283
}
284284

285+
/**
286+
* Returns the largest (latest) batch id in mutation queue that is pending server response.
287+
* Returns 0 if the queue is empty.
288+
*/
289+
public int getHighestUnacknowledgedBatchId() {
290+
return mutationQueue.getLargestUnacknowledgedBatchId();
291+
}
292+
285293
/** Returns the last recorded stream token for the current user. */
286294
public ByteString getLastStreamToken() {
287295
return mutationQueue.getLastStreamToken();

firebase-firestore/src/main/java/com/google/firebase/firestore/local/MemoryMutationQueue.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) {
187187
return queue.size() > index ? queue.get(index) : null;
188188
}
189189

190+
@Override
191+
public int getLargestUnacknowledgedBatchId() {
192+
return queue.size() == 0 ? 0 : nextBatchId - 1;
193+
}
194+
190195
@Override
191196
public List<MutationBatch> getAllMutationBatches() {
192197
return Collections.unmodifiableList(queue);

firebase-firestore/src/main/java/com/google/firebase/firestore/local/MutationQueue.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,12 @@ MutationBatch addMutationBatch(
7373
@Nullable
7474
MutationBatch getNextMutationBatchAfterBatchId(int batchId);
7575

76+
/**
77+
* @return The largest (latest) batch id in mutation queue for the current user that is pending
78+
* server response, 0 if the queue is empty.
79+
*/
80+
int getLargestUnacknowledgedBatchId();
81+
7682
/** Returns all mutation batches in the mutation queue. */
7783
// TODO: PERF: Current consumer only needs mutated keys; if we can provide that
7884
// cheaply, we should replace this.

firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,16 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) {
249249
.firstValue(row -> decodeInlineMutationBatch(row.getInt(0), row.getBlob(1)));
250250
}
251251

252+
@Override
253+
public int getLargestUnacknowledgedBatchId() {
254+
if (isEmpty()) {
255+
return 0;
256+
}
257+
return db.query("SELECT MAX(batch_id) FROM mutations " + "WHERE uid = ?")
258+
.binding(uid)
259+
.firstValue(row -> row.getInt(0));
260+
}
261+
252262
@Override
253263
public List<MutationBatch> getAllMutationBatches() {
254264
List<MutationBatch> result = new ArrayList<>();

firebase-firestore/src/main/java/com/google/firebase/firestore/remote/RemoteStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -478,7 +478,7 @@ private void handleWatchStreamClose(Status status) {
478478
}
479479
}
480480

481-
private boolean canUseNetwork() {
481+
public boolean canUseNetwork() {
482482
// PORTING NOTE: This method exists mostly because web also has to take into account primary
483483
// vs. secondary state.
484484
return networkEnabled;

firebase-firestore/src/test/java/com/google/firebase/firestore/local/LocalStoreTestCase.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1143,4 +1143,21 @@ public void testHandlesPatchMutationWithTransformThenRemoteEvent() {
11431143
assertChanged(doc("foo/bar", 1, map("sum", 1), Document.DocumentState.LOCAL_MUTATIONS));
11441144
assertContains(doc("foo/bar", 1, map("sum", 1), Document.DocumentState.LOCAL_MUTATIONS));
11451145
}
1146+
1147+
@Test
1148+
public void testGetHighestUnacknowledgedBatchIdReturnsExpectedResult() {
1149+
assertEquals(0, localStore.getHighestUnacknowledgedBatchId());
1150+
1151+
writeMutation(setMutation("foo/bar", map("abc", 123)));
1152+
assertEquals(1, localStore.getHighestUnacknowledgedBatchId());
1153+
1154+
writeMutation(patchMutation("foo/bar", map("abc", 321)));
1155+
assertEquals(2, localStore.getHighestUnacknowledgedBatchId());
1156+
1157+
acknowledgeMutation(1);
1158+
assertEquals(2, localStore.getHighestUnacknowledgedBatchId());
1159+
1160+
rejectMutation();
1161+
assertEquals(0, localStore.getHighestUnacknowledgedBatchId());
1162+
}
11461163
}

0 commit comments

Comments
 (0)