Skip to content

Firestore: Fix spurious "Backend didn't respond within 10 seconds" errors when network just slow #8145

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Apr 18, 2024
6 changes: 6 additions & 0 deletions .changeset/early-tomatoes-occur.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@firebase/firestore': patch
'firebase': patch
---

Prevent spurious "Backend didn't respond within 10 seconds" errors when network is indeed responding, just slowly.
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ export class WebChannelConnection extends RestConnection {
LOG_TAG,
`RPC '${rpcName}' stream ${streamId} transport opened.`
);
streamBridge.callOnConnected();
}
});

Expand Down
6 changes: 6 additions & 0 deletions packages/firestore/src/platform/node/grpc_connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,9 +286,15 @@ export class GrpcConnection implements Connection {
}
});

let onConnectedSent = false;
grpcStream.on('data', (msg: Resp) => {
if (!closed) {
logDebug(LOG_TAG, `RPC '${rpcName}' stream ${streamId} received:`, msg);
// Emulate the "onConnected" event that WebChannelConnection sends.
if (!onConnectedSent) {
stream.callOnConnected();
onConnectedSent = true;
}
stream.callOnMessage(msg);
}
});
Expand Down
8 changes: 6 additions & 2 deletions packages/firestore/src/remote/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,14 @@ export interface Connection {
* A bidirectional stream that can be used to send an receive messages.
*
* A stream can be closed locally with close() or can be closed remotely or
* through network errors. onClose is guaranteed to be called. onOpen will only
* be called if the stream successfully established a connection.
* through network errors. onClose is guaranteed to be called. onOpen will be
* called once the stream is ready to send messages (which may or may not be
* before an actual connection to the backend has been established). The
* onConnected event is called when an actual, physical connection with the
* backend has been established, and may occur before or after the onOpen event.
*/
export interface Stream<I, O> {
onConnected(callback: () => void): void;
onOpen(callback: () => void): void;
onClose(callback: (err?: FirestoreError) => void): void;
onMessage(callback: (msg: O) => void): void;
Expand Down
8 changes: 8 additions & 0 deletions packages/firestore/src/remote/persistent_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,11 @@ const enum PersistentStreamState {
* events by the concrete implementation classes.
*/
export interface PersistentStreamListener {
/**
* Called after receiving an acknowledgement from the server, confirming that
* we are able to connect to it.
*/
onConnected: () => Promise<void>;
/**
* Called after the stream was established and can accept outgoing
* messages
Expand Down Expand Up @@ -483,6 +488,9 @@ export abstract class PersistentStream<
const dispatchIfNotClosed = this.getCloseGuardedDispatcher(this.closeCount);

this.stream = this.startRpc(authToken, appCheckToken);
this.stream.onConnected(() => {
dispatchIfNotClosed(() => this.listener!.onConnected());
});
this.stream.onOpen(() => {
dispatchIfNotClosed(() => {
debugAssert(
Expand Down
9 changes: 9 additions & 0 deletions packages/firestore/src/remote/remote_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,13 @@ function cleanUpWatchStreamState(remoteStoreImpl: RemoteStoreImpl): void {
remoteStoreImpl.watchChangeAggregator = undefined;
}

async function onWatchStreamConnected(
remoteStoreImpl: RemoteStoreImpl
): Promise<void> {
// Mark the client as online since we got a "connected" notification.
remoteStoreImpl.onlineStateTracker.set(OnlineState.Online);
}

async function onWatchStreamOpen(
remoteStoreImpl: RemoteStoreImpl
): Promise<void> {
Expand Down Expand Up @@ -923,6 +930,7 @@ function ensureWatchStream(
remoteStoreImpl.datastore,
remoteStoreImpl.asyncQueue,
{
onConnected: onWatchStreamConnected.bind(null, remoteStoreImpl),
onOpen: onWatchStreamOpen.bind(null, remoteStoreImpl),
onClose: onWatchStreamClose.bind(null, remoteStoreImpl),
onWatchChange: onWatchStreamChange.bind(null, remoteStoreImpl)
Expand Down Expand Up @@ -969,6 +977,7 @@ function ensureWriteStream(
remoteStoreImpl.datastore,
remoteStoreImpl.asyncQueue,
{
onConnected: () => Promise.resolve(),
onOpen: onWriteStreamOpen.bind(null, remoteStoreImpl),
onClose: onWriteStreamClose.bind(null, remoteStoreImpl),
onHandshakeComplete: onWriteHandshakeComplete.bind(
Expand Down
17 changes: 17 additions & 0 deletions packages/firestore/src/remote/stream_bridge.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import { Stream } from './connection';
* interface. The stream callbacks are invoked with the callOn... methods.
*/
export class StreamBridge<I, O> implements Stream<I, O> {
private wrappedOnConnected: (() => void) | undefined;
private wrappedOnOpen: (() => void) | undefined;
private wrappedOnClose: ((err?: FirestoreError) => void) | undefined;
private wrappedOnMessage: ((msg: O) => void) | undefined;
Expand All @@ -38,6 +39,14 @@ export class StreamBridge<I, O> implements Stream<I, O> {
this.closeFn = args.closeFn;
}

onConnected(callback: () => void): void {
debugAssert(
!this.wrappedOnConnected,
'Called onConnected on stream twice!'
);
this.wrappedOnConnected = callback;
}

onOpen(callback: () => void): void {
debugAssert(!this.wrappedOnOpen, 'Called onOpen on stream twice!');
this.wrappedOnOpen = callback;
Expand All @@ -61,6 +70,14 @@ export class StreamBridge<I, O> implements Stream<I, O> {
this.sendFn(msg);
}

callOnConnected(): void {
debugAssert(
this.wrappedOnConnected !== undefined,
'Cannot call onConnected because no callback was set'
);
this.wrappedOnConnected();
}

callOnOpen(): void {
debugAssert(
this.wrappedOnOpen !== undefined,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ describeFn('WebChannel', () => {
}
};

// Register an "onConnected" callback since it's required, even though we
// don't care about this event.
stream.onConnected(() => {});

// Once the stream is open, send an "add_target" request
stream.onOpen(() => {
stream.send(payload);
Expand Down
38 changes: 35 additions & 3 deletions packages/firestore/test/integration/remote/stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ import {
Token
} from '../../../src/api/credentials';
import { SnapshotVersion } from '../../../src/core/snapshot_version';
import { Target } from '../../../src/core/target';
import { TargetData, TargetPurpose } from '../../../src/local/target_data';
import { MutationResult } from '../../../src/model/mutation';
import { ResourcePath } from '../../../src/model/path';
import {
newPersistentWatchStream,
newPersistentWriteStream
Expand Down Expand Up @@ -57,7 +60,8 @@ type StreamEventType =
| 'mutationResult'
| 'watchChange'
| 'open'
| 'close';
| 'close'
| 'connected';

const SINGLE_MUTATION = [setMutation('docs/1', { foo: 'bar' })];

Expand Down Expand Up @@ -117,6 +121,10 @@ class StreamStatusListener implements WatchStreamListener, WriteStreamListener {
return this.resolvePending('watchChange');
}

onConnected(): Promise<void> {
return this.resolvePending('connected');
}

onOpen(): Promise<void> {
return this.resolvePending('open');
}
Expand Down Expand Up @@ -148,6 +156,14 @@ describe('Watch Stream', () => {
});
});
});

it('gets connected event before first message', () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find the title of this a little misleading because the test doesn't appear to test for any messages. Or does it and I'm just not seeing how?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The awaitCallback() will fail if some other callback occurs (such as a message). The test doesn't care about if the "message" callback occurs, only that "connected" occurs before it. Does that clarify?

return withTestWatchStream(async (watchStream, streamListener) => {
await streamListener.awaitCallback('open');
watchStream.watch(sampleTargetData());
await streamListener.awaitCallback('connected');
});
});
});

class MockAuthCredentialsProvider extends EmptyAuthCredentialsProvider {
Expand Down Expand Up @@ -190,6 +206,7 @@ describe('Write Stream', () => {
'Handshake must be complete before writing mutations'
);
writeStream.writeHandshake();
await streamListener.awaitCallback('connected');
await streamListener.awaitCallback('handshakeComplete');

// Now writes should succeed
Expand All @@ -205,9 +222,10 @@ describe('Write Stream', () => {
return withTestWriteStream((writeStream, streamListener, queue) => {
return streamListener
.awaitCallback('open')
.then(() => {
.then(async () => {
writeStream.writeHandshake();
return streamListener.awaitCallback('handshakeComplete');
await streamListener.awaitCallback('connected');
await streamListener.awaitCallback('handshakeComplete');
})
.then(() => {
writeStream.markIdle();
Expand All @@ -228,6 +246,7 @@ describe('Write Stream', () => {
return withTestWriteStream(async (writeStream, streamListener, queue) => {
await streamListener.awaitCallback('open');
writeStream.writeHandshake();
await streamListener.awaitCallback('connected');
await streamListener.awaitCallback('handshakeComplete');

// Mark the stream idle, but immediately cancel the idle timer by issuing another write.
Expand Down Expand Up @@ -336,3 +355,16 @@ export async function withTestWatchStream(
streamListener.verifyNoPendingCallbacks();
});
}

function sampleTargetData(): TargetData {
const target: Target = {
path: ResourcePath.emptyPath(),
collectionGroup: null,
orderBy: [],
filters: [],
limit: null,
startAt: null,
endAt: null
};
return new TargetData(target, 1, TargetPurpose.Listen, 1);
}