Skip to content

Don't delay retryable operations #3270

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 36 additions & 24 deletions packages/firestore/src/util/async_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,9 @@ export class AsyncQueue {
// The last promise in the queue.
private tail: Promise<unknown> = Promise.resolve();

// The last retryable operation. Retryable operation are run in order and
// A list of retryable operations. Retryable operations are run in order and
// retried with backoff.
private retryableTail: Promise<void> = Promise.resolve();
private retryableOps: Array<() => Promise<void>> = [];

// Is this AsyncQueue being shut down? Once it is set to true, it will not
// be changed again.
Expand Down Expand Up @@ -323,32 +323,44 @@ export class AsyncQueue {
* operations were retried successfully.
*/
enqueueRetryable(op: () => Promise<void>): void {
this.verifyNotFailed();
this.retryableOps.push(op);
this.enqueueAndForget(() => this.retryNextOp());
}

if (this._isShuttingDown) {
/**
* Runs the next operation from the retryable queue. If the operation fails,
* reschedules with backoff.
*/
private async retryNextOp(): Promise<void> {
if (this.retryableOps.length === 0) {
return;
}

this.retryableTail = this.retryableTail.then(() => {
const deferred = new Deferred<void>();
const retryingOp = async (): Promise<void> => {
try {
await op();
deferred.resolve();
this.backoff.reset();
} catch (e) {
if (isIndexedDbTransactionError(e)) {
logDebug(LOG_TAG, 'Operation failed with retryable error: ' + e);
this.backoff.backoffAndRun(retryingOp);
} else {
deferred.resolve();
throw e; // Failure will be handled by AsyncQueue
}
}
};
this.enqueueAndForget(retryingOp);
return deferred.promise;
});
try {
await this.retryableOps[0]();
this.retryableOps.shift();
this.backoff.reset();
} catch (e) {
if (isIndexedDbTransactionError(e)) {
logDebug(LOG_TAG, 'Operation failed with retryable error: ' + e);
} else {
throw e; // Failure will be handled by AsyncQueue
}
}

if (this.retryableOps.length > 0) {
// If there are additional operations, we re-schedule `retryNextOp()`.
// This is necessary to run retryable operations that failed during
// their initial attempt since we don't know whether they are already
// enqueued. If, for example, `op1`, `op2`, `op3` are enqueued and `op1`
// needs to be re-run, we will run `op1`, `op1`, `op2` using the
// already enqueued calls to `retryNextOp()`. `op3()` will then run in the
// call scheduled here.
// Since `backoffAndRun()` cancels an existing backoff and schedules a
// new backoff on every call, there is only ever a single additional
// operation in the queue.
this.backoff.backoffAndRun(() => this.retryNextOp());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: why is backoff invoked in the successful case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment.

}
}

private enqueueInternal<T extends unknown>(op: () => Promise<T>): Promise<T> {
Expand Down
1 change: 1 addition & 0 deletions packages/firestore/test/unit/specs/recovery_spec.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -787,6 +787,7 @@ describeSpec('Persistence Recovery', ['no-ios', 'no-android'], () => {
.expectActiveTargets({ query })
// We are now user 2
.expectEvents(query, { removed: [doc1], fromCache: true })
.runTimer(TimerId.AsyncQueueRetry)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: why is this change necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous implementation used "enqueue" to run the next delayed operation, this implementation uses "enqueueWithBackoff". I verified that the backoff is 0 at this step, but even operations with zero backoff aren't automatically drained in the SpecTestRunner.

// We are now user 1
.expectEvents(query, {
added: [doc1],
Expand Down
2 changes: 1 addition & 1 deletion packages/firestore/test/unit/specs/spec_test_runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ abstract class TestRunner {
}

async shutdown(): Promise<void> {
await this.queue.enqueue(async () => {
await this.queue.enqueueAndInitiateShutdown(async () => {
if (this.started) {
await this.doShutdown();
}
Expand Down
62 changes: 60 additions & 2 deletions packages/firestore/test/unit/util/async_queue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,7 @@ describe('AsyncQueue', () => {

queue.enqueueRetryable(async () => {
doStep(1);
if (completedSteps.length > 1) {
} else {
if (completedSteps.length === 1) {
throw new IndexedDbTransactionError(
new Error('Simulated retryable error')
);
Expand All @@ -313,6 +312,65 @@ describe('AsyncQueue', () => {
expect(completedSteps).to.deep.equal([1, 1, 2]);
});

it('Does not delay retryable operations that succeed', async () => {
const queue = new AsyncQueue();
const completedSteps: number[] = [];
const doStep = (n: number): void => {
completedSteps.push(n);
};

queue.enqueueRetryable(async () => {
doStep(1);
});
queue.enqueueAndForget(async () => {
doStep(2);
});
await queue.enqueue(async () => {
doStep(3);
});

expect(completedSteps).to.deep.equal([1, 2, 3]);
});

it('Catches up when retryable operation fails', async () => {
const queue = new AsyncQueue();
const completedSteps: number[] = [];
const doStep = (n: number): void => {
completedSteps.push(n);
};

const blockingPromise = new Deferred<void>();

queue.enqueueRetryable(async () => {
doStep(1);
if (completedSteps.length === 1) {
throw new IndexedDbTransactionError(
new Error('Simulated retryable error')
);
}
});
queue.enqueueAndForget(async () => {
doStep(2);
});
queue.enqueueRetryable(async () => {
doStep(3);
blockingPromise.resolve();
});
await blockingPromise.promise;

// Once all existing retryable operations succeeded, they are scheduled
// in the order they are enqueued.
queue.enqueueAndForget(async () => {
doStep(4);
});
await queue.enqueue(async () => {
doStep(5);
});

await blockingPromise.promise;
expect(completedSteps).to.deep.equal([1, 2, 1, 3, 4, 5]);
});

it('Can drain (non-delayed) operations', async () => {
const queue = new AsyncQueue();
const completedSteps: number[] = [];
Expand Down