Skip to content

Simplify Listen API #3491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Jul 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 9 additions & 17 deletions packages/firestore/exp/src/api/reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ import { debugAssert } from '../../../src/util/assert';
import { cast } from '../../../lite/src/api/util';
import { DocumentSnapshot, QuerySnapshot } from './snapshot';
import {
addDocSnapshotListener,
addQuerySnapshotListener,
addSnapshotsInSyncListener,
applyFirestoreDataConverter,
getDocsViaSnapshotListener,
getDocViaSnapshotListener,
Expand All @@ -60,6 +57,7 @@ import {
Unsubscribe
} from '../../../src/api/observer';
import { getFirestoreClient } from './components';
import { newQueryForPath } from '../../../src/core/query';

export function getDoc<T>(
reference: firestore.DocumentReference<T>
Expand Down Expand Up @@ -369,7 +367,7 @@ export function onSnapshot<T>(
args[currArg + 2] = userObserver.complete?.bind(userObserver);
}

let asyncObserver: Promise<Unsubscribe>;
let asyncUnsubscribe: Promise<Unsubscribe>;

if (ref instanceof DocumentReference) {
const firestore = cast(ref.firestore, Firestore);
Expand All @@ -386,10 +384,9 @@ export function onSnapshot<T>(
complete: args[currArg + 2] as CompleteFn
};

asyncObserver = getFirestoreClient(firestore).then(firestoreClient =>
addDocSnapshotListener(
firestoreClient,
ref._key,
asyncUnsubscribe = getFirestoreClient(firestore).then(firestoreClient =>
firestoreClient.listen(
newQueryForPath(ref._key.path),
internalOptions,
observer
)
Expand All @@ -412,21 +409,16 @@ export function onSnapshot<T>(

validateHasExplicitOrderByForLimitToLast(query._query);

asyncObserver = getFirestoreClient(firestore).then(firestoreClient =>
addQuerySnapshotListener(
firestoreClient,
query._query,
internalOptions,
observer
)
asyncUnsubscribe = getFirestoreClient(firestore).then(firestoreClient =>
firestoreClient.listen(query._query, internalOptions, observer)
);
}

// TODO(firestorexp): Add test that verifies that we don't raise a snapshot if
// unsubscribe is called before `asyncObserver` resolves.
return () => {
// eslint-disable-next-line @typescript-eslint/no-floating-promises
asyncObserver.then(unsubscribe => unsubscribe());
asyncUnsubscribe.then(unsubscribe => unsubscribe());
};
}

Expand Down Expand Up @@ -458,7 +450,7 @@ export function onSnapshotsInSync(
const asyncObserver = getFirestoreClient(
firestoreImpl
).then(firestoreClient =>
addSnapshotsInSyncListener(firestoreClient, observer)
firestoreClient.addSnapshotsInSyncListener(observer)
);

// TODO(firestorexp): Add test that verifies that we don't raise a snapshot if
Expand Down
120 changes: 12 additions & 108 deletions packages/firestore/src/api/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import { _FirebaseApp, FirebaseService } from '@firebase/app-types/private';
import { DatabaseId, DatabaseInfo } from '../core/database_info';
import { ListenOptions } from '../core/event_manager';
import {
OnlineComponentProvider,
MemoryOfflineComponentProvider,
OfflineComponentProvider
OfflineComponentProvider,
OnlineComponentProvider
} from '../core/component_provider';
import { FirestoreClient, PersistenceSettings } from '../core/firestore_client';
import {
Expand Down Expand Up @@ -60,7 +60,6 @@ import { FieldPath, ResourcePath } from '../model/path';
import { isServerTimestamp } from '../model/server_timestamps';
import { refValue } from '../model/values';
import { debugAssert, fail } from '../util/assert';
import { AsyncObserver } from '../util/async_observer';
import { AsyncQueue } from '../util/async_queue';
import { Code, FirestoreError } from '../util/error';
import {
Expand Down Expand Up @@ -485,16 +484,15 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService {
this.ensureClientConfigured();

if (isPartialObserver(arg)) {
return addSnapshotsInSyncListener(
this._firestoreClient!,
return this._firestoreClient!.addSnapshotsInSyncListener(
arg as PartialObserver<void>
);
} else {
validateArgType('Firestore.onSnapshotsInSync', 'function', 1, arg);
const observer: PartialObserver<void> = {
next: arg as () => void
};
return addSnapshotsInSyncListener(this._firestoreClient!, observer);
return this._firestoreClient!.addSnapshotsInSyncListener(observer);
}
}

Expand Down Expand Up @@ -686,29 +684,6 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService {
}
}

/** Registers the listener for onSnapshotsInSync() */
export function addSnapshotsInSyncListener(
firestoreClient: FirestoreClient,
observer: PartialObserver<void>
): Unsubscribe {
const errHandler = (err: Error): void => {
throw fail('Uncaught Error in onSnapshotsInSync');
};
const asyncObserver = new AsyncObserver<void>({
next: () => {
if (observer.next) {
observer.next();
}
},
error: errHandler
});
firestoreClient.addSnapshotsInSyncListener(asyncObserver);
return () => {
asyncObserver.mute();
firestoreClient.removeSnapshotsInSyncListener(asyncObserver);
};
}

/**
* A reference to a transaction.
*/
Expand Down Expand Up @@ -1183,7 +1158,7 @@ export class DocumentReference<T = firestore.DocumentData>
1,
4
);
let options: firestore.SnapshotListenOptions = {
let options: ListenOptions = {
includeMetadataChanges: false
};
let currArg = 0;
Expand Down Expand Up @@ -1248,9 +1223,8 @@ export class DocumentReference<T = firestore.DocumentData>
complete: args[currArg + 2] as CompleteFn
};

return addDocSnapshotListener(
this._firestoreClient,
this._key,
return this._firestoreClient.listen(
newQueryForPath(this._key.path),
internalOptions,
observer
);
Expand Down Expand Up @@ -1312,40 +1286,6 @@ export class DocumentReference<T = firestore.DocumentData>
}
}

/** Registers an internal snapshot listener for `ref`. */
export function addDocSnapshotListener(
firestoreClient: FirestoreClient,
key: DocumentKey,
options: ListenOptions,
observer: PartialObserver<ViewSnapshot>
): Unsubscribe {
let errHandler = (err: Error): void => {
console.error('Uncaught Error in onSnapshot:', err);
};
if (observer.error) {
errHandler = observer.error.bind(observer);
}

const asyncObserver = new AsyncObserver<ViewSnapshot>({
next: snapshot => {
if (observer.next) {
observer.next(snapshot);
}
},
error: errHandler
});
const internalListener = firestoreClient.listen(
newQueryForPath(key.path),
asyncObserver,
options
);

return () => {
asyncObserver.mute();
firestoreClient.unlisten(internalListener);
};
}

/**
* Retrieves a latency-compensated document from the backend via a
* SnapshotListener.
Expand All @@ -1356,9 +1296,8 @@ export function getDocViaSnapshotListener(
options?: firestore.GetOptions
): Promise<ViewSnapshot> {
const result = new Deferred<ViewSnapshot>();
const unlisten = addDocSnapshotListener(
firestoreClient,
key,
const unlisten = firestoreClient.listen(
newQueryForPath(key.path),
{
includeMetadataChanges: true,
waitForSyncWhenOnline: true
Expand Down Expand Up @@ -2159,7 +2098,7 @@ export class Query<T = firestore.DocumentData> implements firestore.Query<T> {

onSnapshot(...args: unknown[]): Unsubscribe {
validateBetweenNumberOfArgs('Query.onSnapshot', arguments, 1, 4);
let options: firestore.SnapshotListenOptions = {};
let options: ListenOptions = {};
let currArg = 0;
if (
typeof args[currArg] === 'object' &&
Expand Down Expand Up @@ -2220,12 +2159,7 @@ export class Query<T = firestore.DocumentData> implements firestore.Query<T> {

validateHasExplicitOrderByForLimitToLast(this._query);
const firestoreClient = this.firestore.ensureClientConfigured();
return addQuerySnapshotListener(
firestoreClient,
this._query,
options,
observer
);
return firestoreClient.listen(this._query, options, observer);
}

get(options?: firestore.GetOptions): Promise<firestore.QuerySnapshot<T>> {
Expand Down Expand Up @@ -2254,8 +2188,7 @@ export function getDocsViaSnapshotListener(
options?: firestore.GetOptions
): Promise<ViewSnapshot> {
const result = new Deferred<ViewSnapshot>();
const unlisten = addQuerySnapshotListener(
firestore,
const unlisten = firestore.listen(
query,
{
includeMetadataChanges: true,
Expand Down Expand Up @@ -2287,35 +2220,6 @@ export function getDocsViaSnapshotListener(
return result.promise;
}

/** Registers an internal snapshot listener for `query`. */
export function addQuerySnapshotListener(
firestore: FirestoreClient,
query: InternalQuery,
options: ListenOptions,
observer: PartialObserver<ViewSnapshot>
): Unsubscribe {
let errHandler = (err: Error): void => {
console.error('Uncaught Error in onSnapshot:', err);
};
if (observer.error) {
errHandler = observer.error.bind(observer);
}
const asyncObserver = new AsyncObserver<ViewSnapshot>({
next: (result: ViewSnapshot): void => {
if (observer.next) {
observer.next(result);
}
},
error: errHandler
});

const internalListener = firestore.listen(query, asyncObserver, options);
return (): void => {
asyncObserver.mute();
firestore.unlisten(internalListener);
};
}

export class QuerySnapshot<T = firestore.DocumentData>
implements firestore.QuerySnapshot<T> {
private _cachedChanges: Array<firestore.DocumentChange<T>> | null = null;
Expand Down
55 changes: 21 additions & 34 deletions packages/firestore/src/core/firestore_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import {
} from './event_manager';
import { SyncEngine } from './sync_engine';
import { View } from './view';

import { SharedClientState } from '../local/shared_client_state';
import { AutoId } from '../util/misc';
import { DatabaseId, DatabaseInfo } from './database_info';
Expand All @@ -47,6 +46,7 @@ import {
MemoryOfflineComponentProvider,
OfflineComponentProvider
} from './component_provider';
import { AsyncObserver } from '../util/async_observer';

const LOG_TAG = 'FirestoreClient';
const MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100;
Expand Down Expand Up @@ -391,24 +391,17 @@ export class FirestoreClient {

listen(
query: Query,
observer: Observer<ViewSnapshot>,
options: ListenOptions
): QueryListener {
options: ListenOptions,
observer: Partial<Observer<ViewSnapshot>>
): () => void {
this.verifyNotTerminated();
const listener = new QueryListener(query, observer, options);
const wrappedObserver = new AsyncObserver(observer);
const listener = new QueryListener(query, wrappedObserver, options);
this.asyncQueue.enqueueAndForget(() => this.eventMgr.listen(listener));
return listener;
}

unlisten(listener: QueryListener): void {
// Checks for termination but does not raise error, allowing unlisten after
// termination to be a no-op.
if (this.clientTerminated) {
return;
}
this.asyncQueue.enqueueAndForget(() => {
return this.eventMgr.unlisten(listener);
});
return () => {
wrappedObserver.mute();
this.asyncQueue.enqueueAndForget(() => this.eventMgr.unlisten(listener));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@schmidt-sebastian Can you just confirm that it is indeed unnecessary to check for if (this.clientTerminated) before calling this.asyncQueue.enqueueAndForget()? I'm not familiar enough with AsyncQueue to know, but the old code avoids calling this.asyncQueue.enqueueAndForget() in the case that this.clientTerminated is false. The new code will call it regardless of the value of this.clientTerminated.

This comment also applies to addSnapshotsInSyncListener() below.

};
}

async getDocumentFromLocalCache(
Expand Down Expand Up @@ -486,24 +479,18 @@ export class FirestoreClient {
return this.databaseInfo.databaseId;
}

addSnapshotsInSyncListener(observer: Observer<void>): void {
addSnapshotsInSyncListener(observer: Partial<Observer<void>>): () => void {
this.verifyNotTerminated();
this.asyncQueue.enqueueAndForget(() => {
this.eventMgr.addSnapshotsInSyncListener(observer);
return Promise.resolve();
});
}

removeSnapshotsInSyncListener(observer: Observer<void>): void {
// Checks for shutdown but does not raise error, allowing remove after
// shutdown to be a no-op.
if (this.clientTerminated) {
return;
}
this.asyncQueue.enqueueAndForget(() => {
this.eventMgr.removeSnapshotsInSyncListener(observer);
return Promise.resolve();
});
const wrappedObserver = new AsyncObserver(observer);
this.asyncQueue.enqueueAndForget(async () =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it necessary to specify an async function here to this.asyncQueue.enqueueAndForget()? In the listen() method it just specifies a normal, non-async function. I see that this is consistent with the old code but the reason for the inconsistency within this file is non-obvious to me. Should all calls to enqueueAndForget() be async functions? It would appear so based on the signature of enqueueAndForget().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a test already:

it('can unlisten queries after termination', async () => {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh perfect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - I replied to the wrong comment (but looks like you managed to restore the context). async makes the function return a Promise, even if the return of the value being returned is not a Promise. In the case above, "eventManager.listen" already returns a Promise.

This change is not needed, it just felt right to me. Manually returning Promise.resolve works as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I forgot to check the return type of eventManager.listen(). With this explanation, I'd recommend to keep it as is, with the more modern async keyword.

this.eventMgr.addSnapshotsInSyncListener(wrappedObserver)
);
return () => {
wrappedObserver.mute();
this.asyncQueue.enqueueAndForget(async () =>
this.eventMgr.removeSnapshotsInSyncListener(wrappedObserver)
);
};
}

get clientTerminated(): boolean {
Expand Down
12 changes: 9 additions & 3 deletions packages/firestore/src/util/async_observer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,20 @@ export class AsyncObserver<T> implements Observer<T> {
*/
private muted = false;

constructor(private observer: Observer<T>) {}
constructor(private observer: Partial<Observer<T>>) {}

next(value: T): void {
this.scheduleEvent(this.observer.next, value);
if (this.observer.next) {
this.scheduleEvent(this.observer.next, value);
}
}

error(error: Error): void {
this.scheduleEvent(this.observer.error, error);
if (this.observer.error) {
this.scheduleEvent(this.observer.error, error);
} else {
console.error('Uncaught Error in snapshot listener:', error);
}
}

mute(): void {
Expand Down