Skip to content

Port "Stop persisting last acknowledged batch ID" from iOS #188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ final class MemoryMutationQueue implements MutationQueue {
/** The next value to use when assigning sequential IDs to each mutation batch. */
private int nextBatchId;

/** The highest acknowledged mutation in the queue. */
private int highestAcknowledgedBatchId;

/**
* The last received stream token from the server, used to acknowledge which responses the client
* has processed. Stream tokens are opaque checkpoint markers whose only real value is their
Expand All @@ -79,7 +76,6 @@ final class MemoryMutationQueue implements MutationQueue {

batchesByDocumentKey = new ImmutableSortedSet<>(emptyList(), DocumentReference.BY_KEY);
nextBatchId = 1;
highestAcknowledgedBatchId = MutationBatch.UNKNOWN;
lastStreamToken = WriteStream.EMPTY_STREAM_TOKEN;
}

Expand All @@ -90,14 +86,10 @@ public void start() {
// Note: The queue may be shutdown / started multiple times, since we maintain the queue for the
// duration of the app session in case a user logs out / back in. To behave like the
// SQLite-backed MutationQueue (and accommodate tests that expect as much), we reset nextBatchId
// and highestAcknowledgedBatchId if the queue is empty.
// if the queue is empty.
if (isEmpty()) {
nextBatchId = 1;
highestAcknowledgedBatchId = MutationBatch.UNKNOWN;
}
hardAssert(
highestAcknowledgedBatchId < nextBatchId,
"highestAcknowledgedBatchId must be less than the nextBatchId");
}

@Override
Expand All @@ -110,9 +102,6 @@ public boolean isEmpty() {
@Override
public void acknowledgeBatch(MutationBatch batch, ByteString streamToken) {
int batchId = batch.getBatchId();
hardAssert(
batchId > highestAcknowledgedBatchId, "Mutation batchIds must be acknowledged in order");

int batchIndex = indexOfExistingBatchId(batchId, "acknowledged");
hardAssert(batchIndex == 0, "Can only acknowledge the first batch in the mutation queue");

Expand All @@ -124,7 +113,6 @@ public void acknowledgeBatch(MutationBatch batch, ByteString streamToken) {
batchId,
check.getBatchId());

highestAcknowledgedBatchId = batchId;
lastStreamToken = checkNotNull(streamToken);
}

Expand Down Expand Up @@ -180,10 +168,7 @@ public MutationBatch lookupMutationBatch(int batchId) {
@Nullable
@Override
public MutationBatch getNextMutationBatchAfterBatchId(int batchId) {
// All batches with batchId <= highestAcknowledgedBatchId have been acknowledged so the
// first unacknowledged batch after batchId will have a batchId larger than both of these
// values.
int nextBatchId = Math.max(batchId, highestAcknowledgedBatchId) + 1;
int nextBatchId = batchId + 1;

// The requested batchId may still be out of range so normalize it to the start of the queue.
int rawIndex = indexOfBatchId(nextBatchId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ interface MutationQueue {
/**
* Starts the mutation queue, performing any initial reads that might be required to establish
* invariants, etc.
*
* <p>After starting, the mutation queue must guarantee that the highestAcknowledgedBatchID is
* less than nextBatchID. This prevents the local store from creating new batches that the
* mutation queue would consider erroneously acknowledged.
*/
void start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ final class SQLiteMutationQueue implements MutationQueue {
*/
private int nextBatchId;

/**
* An identifier for the highest numbered batch that has been acknowledged by the server. All
* MutationBatches in this queue with batch_ids less than or equal to this value are considered to
* have been acknowledged by the server.
*/
private int lastAcknowledgedBatchId;

/**
* A stream token that was previously sent by the server.
*
Expand Down Expand Up @@ -94,30 +87,18 @@ final class SQLiteMutationQueue implements MutationQueue {
public void start() {
loadNextBatchIdAcrossAllUsers();

// On restart, nextBatchId may end up lower than lastAcknowledgedBatchId since it's computed
// from the queue contents, and there may be no mutations in the queue. In this case, we need
// to reset lastAcknowledgedBatchId (which is safe since the queue must be empty).
lastAcknowledgedBatchId = MutationBatch.UNKNOWN;
int rows =
db.query(
"SELECT last_acknowledged_batch_id, last_stream_token "
+ "FROM mutation_queues WHERE uid = ?")
db.query("SELECT last_stream_token FROM mutation_queues WHERE uid = ?")
.binding(uid)
.first(
row -> {
lastAcknowledgedBatchId = row.getInt(0);
lastStreamToken = ByteString.copyFrom(row.getBlob(1));
lastStreamToken = ByteString.copyFrom(row.getBlob(0));
});

if (rows == 0) {
// Ensure we write a default entry in mutation_queues since loadNextBatchIdAcrossAllUsers()
// depends upon every queue having an entry.
writeMutationQueueMetadata();

} else if (lastAcknowledgedBatchId >= nextBatchId) {
hardAssert(isEmpty(), "Reset nextBatchId is only possible when the queue is empty");
lastAcknowledgedBatchId = MutationBatch.UNKNOWN;
writeMutationQueueMetadata();
}
}

Expand Down Expand Up @@ -161,11 +142,6 @@ public boolean isEmpty() {

@Override
public void acknowledgeBatch(MutationBatch batch, ByteString streamToken) {
int batchId = batch.getBatchId();
hardAssert(
batchId > lastAcknowledgedBatchId, "Mutation batchIds must be acknowledged in order");

lastAcknowledgedBatchId = batchId;
lastStreamToken = checkNotNull(streamToken);
writeMutationQueueMetadata();
}
Expand All @@ -187,7 +163,7 @@ private void writeMutationQueueMetadata() {
+ "(uid, last_acknowledged_batch_id, last_stream_token) "
+ "VALUES (?, ?, ?)",
uid,
lastAcknowledgedBatchId,
-1,
lastStreamToken.toByteArray());
}

Expand Down Expand Up @@ -236,9 +212,7 @@ public MutationBatch lookupMutationBatch(int batchId) {
@Nullable
@Override
public MutationBatch getNextMutationBatchAfterBatchId(int batchId) {
// All batches with batchId <= lastAcknowledgedBatchId have been acknowledged so the first
// unacknowledged batch after batchID will have a batchID larger than both of these values.
int nextBatchId = Math.max(batchId, lastAcknowledgedBatchId) + 1;
int nextBatchId = batchId + 1;

return db.query(
"SELECT mutations FROM mutations "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ private void createV1MutationQueue() {
});
}

/** Note: as of this migration, `last_acknowledged_batch_id` is no longer used by the code. */
private void removeAcknowledgedMutations() {
SQLitePersistence.Query mutationQueuesQuery =
new SQLitePersistence.Query(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,23 +174,6 @@ public void testNextMutationBatchAfterBatchId() {
assertNull(notFound);
}

@Test
public void testNextMutationBatchAfterBatchIdSkipsAcknowledgedBatches() {
List<MutationBatch> batches = createBatches(3);
assertEquals(
batches.get(0), mutationQueue.getNextMutationBatchAfterBatchId(MutationBatch.UNKNOWN));

acknowledgeBatch(batches.get(0));
assertEquals(
batches.get(1), mutationQueue.getNextMutationBatchAfterBatchId(MutationBatch.UNKNOWN));
assertEquals(
batches.get(1),
mutationQueue.getNextMutationBatchAfterBatchId(batches.get(0).getBatchId()));
assertEquals(
batches.get(2),
mutationQueue.getNextMutationBatchAfterBatchId(batches.get(1).getBatchId()));
}

@Test
public void testAllMutationBatchesAffectingDocumentKey() {
List<Mutation> mutations =
Expand Down