Skip to content

Commit 655e962

Browse files
committed
Handle reading large mutation batches
1 parent 557749e commit 655e962

File tree

2 files changed

+91
-18
lines changed

2 files changed

+91
-18
lines changed

firebase-firestore/src/androidTest/java/com/google/firebase/firestore/WriteBatchTest.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@
3232
import com.google.firebase.firestore.FirebaseFirestoreException.Code;
3333
import com.google.firebase.firestore.testutil.EventAccumulator;
3434
import com.google.firebase.firestore.testutil.IntegrationTestUtil;
35+
import com.google.firebase.firestore.util.Util;
3536
import java.util.Arrays;
37+
import java.util.HashMap;
38+
import java.util.Map;
3639
import org.junit.After;
3740
import org.junit.Test;
3841
import org.junit.runner.RunWith;
@@ -253,4 +256,28 @@ public void testCanWriteTheSameDocumentMultipleTimes() {
253256
assertNotNull(when);
254257
assertEquals(map("a", 1L, "b", 2L, "when", when), serverSnap.getData());
255258
}
259+
260+
@Test
261+
public void testCanWriteVeryLargeBatches() {
262+
String a = Character.toString('a');
263+
StringBuilder buf = new StringBuilder(1000);
264+
for (int i = 0; i < 1000; i++) {
265+
buf.append(a);
266+
}
267+
String kb = buf.toString();
268+
Map<String, Object> values = new HashMap<>();
269+
for (int j = 0; j < 1000; j++) {
270+
values.put(Util.autoId(), kb);
271+
}
272+
273+
DocumentReference doc = testDocument();
274+
WriteBatch batch = doc.getFirestore().batch();
275+
276+
batch.set(doc, values);
277+
for (int i = 0; i < 2; i++) {
278+
batch.update(doc, values);
279+
}
280+
281+
waitFor(batch.commit());
282+
}
256283
}

firebase-firestore/src/main/java/com/google/firebase/firestore/local/SQLiteMutationQueue.java

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.firebase.firestore.util.Assert.hardAssert;
2020

2121
import android.database.sqlite.SQLiteStatement;
22+
import android.os.ParcelFileDescriptor;
2223
import com.google.firebase.Timestamp;
2324
import com.google.firebase.firestore.auth.User;
2425
import com.google.firebase.firestore.core.Query;
@@ -31,6 +32,7 @@
3132
import com.google.protobuf.ByteString;
3233
import com.google.protobuf.InvalidProtocolBufferException;
3334
import com.google.protobuf.MessageLite;
35+
import java.io.IOException;
3436
import java.util.ArrayList;
3537
import java.util.Arrays;
3638
import java.util.Collections;
@@ -42,6 +44,8 @@
4244
/** A mutation queue for a specific user, backed by SQLite. */
4345
final class SQLiteMutationQueue implements MutationQueue {
4446

47+
private static final int BLOB_MAX_INLINE_LENGTH = 1000000;
48+
4549
private final SQLitePersistence db;
4650
private final LocalSerializer serializer;
4751

@@ -204,9 +208,12 @@ public MutationBatch addMutationBatch(Timestamp localWriteTime, List<Mutation> m
204208
@Nullable
205209
@Override
206210
public MutationBatch lookupMutationBatch(int batchId) {
207-
return db.query("SELECT mutations FROM mutations WHERE uid = ? AND batch_id = ?")
208-
.binding(uid, batchId)
209-
.firstValue(row -> decodeMutationBatch(row.getBlob(0)));
211+
return db.query(
212+
"SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) "
213+
+ "FROM mutations m "
214+
+ "WHERE uid = ? AND batch_id = ?")
215+
.binding(BLOB_MAX_INLINE_LENGTH, uid, batchId)
216+
.firstValue(row -> decodeMutationBatchRow(row.getInt(0), row.getBlob(1)));
210217
}
211218

212219
@Nullable
@@ -215,19 +222,23 @@ public MutationBatch getNextMutationBatchAfterBatchId(int batchId) {
215222
int nextBatchId = batchId + 1;
216223

217224
return db.query(
218-
"SELECT mutations FROM mutations "
225+
"SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) "
226+
+ "FROM mutations m "
219227
+ "WHERE uid = ? AND batch_id >= ? "
220228
+ "ORDER BY batch_id ASC LIMIT 1")
221-
.binding(uid, nextBatchId)
222-
.firstValue(row -> decodeMutationBatch(row.getBlob(0)));
229+
.binding(BLOB_MAX_INLINE_LENGTH, uid, nextBatchId)
230+
.firstValue(row -> decodeMutationBatchRow(row.getInt(0), row.getBlob(1)));
223231
}
224232

225233
@Override
226234
public List<MutationBatch> getAllMutationBatches() {
227235
List<MutationBatch> result = new ArrayList<>();
228-
db.query("SELECT mutations FROM mutations WHERE uid = ? ORDER BY batch_id ASC")
229-
.binding(uid)
230-
.forEach(row -> result.add(decodeMutationBatch(row.getBlob(0))));
236+
db.query(
237+
"SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) "
238+
+ "FROM mutations m "
239+
+ "WHERE uid = ? ORDER BY batch_id ASC")
240+
.binding(BLOB_MAX_INLINE_LENGTH, uid)
241+
.forEach(row -> result.add(decodeMutationBatchRow(row.getInt(0), row.getBlob(1))));
231242
return result;
232243
}
233244

@@ -237,14 +248,15 @@ public List<MutationBatch> getAllMutationBatchesAffectingDocumentKey(DocumentKey
237248

238249
List<MutationBatch> result = new ArrayList<>();
239250
db.query(
240-
"SELECT m.mutations FROM document_mutations dm, mutations m "
251+
"SELECT m.batch_id, SUBSTR(m.mutations, 1, ?) "
252+
+ "FROM document_mutations dm, mutations m "
241253
+ "WHERE dm.uid = ? "
242254
+ "AND dm.path = ? "
243255
+ "AND dm.uid = m.uid "
244256
+ "AND dm.batch_id = m.batch_id "
245257
+ "ORDER BY dm.batch_id")
246-
.binding(uid, path)
247-
.forEach(row -> result.add(decodeMutationBatch(row.getBlob(0))));
258+
.binding(BLOB_MAX_INLINE_LENGTH, uid, path)
259+
.forEach(row -> result.add(decodeMutationBatchRow(row.getInt(0), row.getBlob(1))));
248260
return result;
249261
}
250262

@@ -259,10 +271,11 @@ public List<MutationBatch> getAllMutationBatchesAffectingDocumentKeys(
259271
SQLitePersistence.LongQuery longQuery =
260272
new SQLitePersistence.LongQuery(
261273
db,
262-
"SELECT DISTINCT dm.batch_id, m.mutations FROM document_mutations dm, mutations m "
274+
"SELECT DISTINCT dm.batch_id, SUBSTR(m.mutations, 1, ?) "
275+
+ "FROM document_mutations dm, mutations m "
263276
+ "WHERE dm.uid = ? "
264277
+ "AND dm.path IN (",
265-
Arrays.asList(uid),
278+
Arrays.asList(BLOB_MAX_INLINE_LENGTH, uid),
266279
args,
267280
") "
268281
+ "AND dm.uid = m.uid "
@@ -279,7 +292,7 @@ public List<MutationBatch> getAllMutationBatchesAffectingDocumentKeys(
279292
int batchId = row.getInt(0);
280293
if (!uniqueBatchIds.contains(batchId)) {
281294
uniqueBatchIds.add(batchId);
282-
result.add(decodeMutationBatch(row.getBlob(1)));
295+
result.add(decodeMutationBatchRow(batchId, row.getBlob(1)));
283296
}
284297
});
285298
}
@@ -321,14 +334,15 @@ public List<MutationBatch> getAllMutationBatchesAffectingQuery(Query query) {
321334

322335
List<MutationBatch> result = new ArrayList<>();
323336
db.query(
324-
"SELECT dm.batch_id, dm.path, m.mutations FROM document_mutations dm, mutations m "
337+
"SELECT dm.batch_id, dm.path, SUBSTR(m.mutations, 1, ?) "
338+
+ "FROM document_mutations dm, mutations m "
325339
+ "WHERE dm.uid = ? "
326340
+ "AND dm.path >= ? "
327341
+ "AND dm.path < ? "
328342
+ "AND dm.uid = m.uid "
329343
+ "AND dm.batch_id = m.batch_id "
330344
+ "ORDER BY dm.batch_id")
331-
.binding(uid, prefixPath, prefixSuccessorPath)
345+
.binding(BLOB_MAX_INLINE_LENGTH, uid, prefixPath, prefixSuccessorPath)
332346
.forEach(
333347
row -> {
334348
// Ensure unique batches only. This works because the batches come out in order so we
@@ -350,7 +364,7 @@ public List<MutationBatch> getAllMutationBatchesAffectingQuery(Query query) {
350364
return;
351365
}
352366

353-
result.add(decodeMutationBatch(row.getBlob(2)));
367+
result.add(decodeMutationBatchRow(batchId, row.getBlob(2)));
354368
});
355369

356370
return result;
@@ -399,6 +413,38 @@ public void performConsistencyCheck() {
399413
danglingMutationReferences);
400414
}
401415

416+
/**
417+
* Decodes a mutation batch row containing a batch id and a substring of a blob. If the blob is
418+
* too large, executes another query to load the blob directly.
419+
*
420+
* @param batchId The batch ID of the row containing the blob
421+
* @param bytes The bytes represented
422+
* @return
423+
*/
424+
private MutationBatch decodeMutationBatchRow(int batchId, byte[] bytes) {
425+
if (bytes.length < BLOB_MAX_INLINE_LENGTH) {
426+
return decodeMutationBatch(bytes);
427+
}
428+
429+
SQLiteStatement loader =
430+
db.prepare("SELECT mutations FROM mutations WHERE uid = ? AND batch_id = ?");
431+
loader.bindString(1, uid);
432+
loader.bindLong(2, batchId);
433+
434+
ParcelFileDescriptor blobFile = loader.simpleQueryForBlobFileDescriptor();
435+
hardAssert(blobFile != null, "Blob exists so descriptor should not be null");
436+
437+
try (ParcelFileDescriptor.AutoCloseInputStream stream =
438+
new ParcelFileDescriptor.AutoCloseInputStream(blobFile)) {
439+
return serializer.decodeMutationBatch(
440+
com.google.firebase.firestore.proto.WriteBatch.parseFrom(stream));
441+
} catch (InvalidProtocolBufferException e) {
442+
throw fail("MutationBatch failed to parse: %s", e);
443+
} catch (IOException e) {
444+
throw fail("Failed to read blob for uid=%s, batch_id=%d: %s", uid, batchId, e);
445+
}
446+
}
447+
402448
private MutationBatch decodeMutationBatch(byte[] bytes) {
403449
try {
404450
return serializer.decodeMutationBatch(

0 commit comments

Comments
 (0)