Skip to content

Commit 217dca9

Browse files
Simplify Listen API (#3491)
1 parent acfedd2 commit 217dca9

File tree

4 files changed

+51
-162
lines changed

4 files changed

+51
-162
lines changed

packages/firestore/exp/src/api/reference.ts

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ import { debugAssert } from '../../../src/util/assert';
3131
import { cast } from '../../../lite/src/api/util';
3232
import { DocumentSnapshot, QuerySnapshot } from './snapshot';
3333
import {
34-
addDocSnapshotListener,
35-
addQuerySnapshotListener,
36-
addSnapshotsInSyncListener,
3734
applyFirestoreDataConverter,
3835
getDocsViaSnapshotListener,
3936
getDocViaSnapshotListener,
@@ -60,6 +57,7 @@ import {
6057
Unsubscribe
6158
} from '../../../src/api/observer';
6259
import { getFirestoreClient } from './components';
60+
import { newQueryForPath } from '../../../src/core/query';
6361

6462
export function getDoc<T>(
6563
reference: firestore.DocumentReference<T>
@@ -369,7 +367,7 @@ export function onSnapshot<T>(
369367
args[currArg + 2] = userObserver.complete?.bind(userObserver);
370368
}
371369

372-
let asyncObserver: Promise<Unsubscribe>;
370+
let asyncUnsubscribe: Promise<Unsubscribe>;
373371

374372
if (ref instanceof DocumentReference) {
375373
const firestore = cast(ref.firestore, Firestore);
@@ -386,10 +384,9 @@ export function onSnapshot<T>(
386384
complete: args[currArg + 2] as CompleteFn
387385
};
388386

389-
asyncObserver = getFirestoreClient(firestore).then(firestoreClient =>
390-
addDocSnapshotListener(
391-
firestoreClient,
392-
ref._key,
387+
asyncUnsubscribe = getFirestoreClient(firestore).then(firestoreClient =>
388+
firestoreClient.listen(
389+
newQueryForPath(ref._key.path),
393390
internalOptions,
394391
observer
395392
)
@@ -412,21 +409,16 @@ export function onSnapshot<T>(
412409

413410
validateHasExplicitOrderByForLimitToLast(query._query);
414411

415-
asyncObserver = getFirestoreClient(firestore).then(firestoreClient =>
416-
addQuerySnapshotListener(
417-
firestoreClient,
418-
query._query,
419-
internalOptions,
420-
observer
421-
)
412+
asyncUnsubscribe = getFirestoreClient(firestore).then(firestoreClient =>
413+
firestoreClient.listen(query._query, internalOptions, observer)
422414
);
423415
}
424416

425417
// TODO(firestorexp): Add test that verifies that we don't raise a snapshot if
426418
// unsubscribe is called before `asyncObserver` resolves.
427419
return () => {
428420
// eslint-disable-next-line @typescript-eslint/no-floating-promises
429-
asyncObserver.then(unsubscribe => unsubscribe());
421+
asyncUnsubscribe.then(unsubscribe => unsubscribe());
430422
};
431423
}
432424

@@ -458,7 +450,7 @@ export function onSnapshotsInSync(
458450
const asyncObserver = getFirestoreClient(
459451
firestoreImpl
460452
).then(firestoreClient =>
461-
addSnapshotsInSyncListener(firestoreClient, observer)
453+
firestoreClient.addSnapshotsInSyncListener(observer)
462454
);
463455

464456
// TODO(firestorexp): Add test that verifies that we don't raise a snapshot if

packages/firestore/src/api/database.ts

Lines changed: 12 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ import { _FirebaseApp, FirebaseService } from '@firebase/app-types/private';
2424
import { DatabaseId, DatabaseInfo } from '../core/database_info';
2525
import { ListenOptions } from '../core/event_manager';
2626
import {
27-
OnlineComponentProvider,
2827
MemoryOfflineComponentProvider,
29-
OfflineComponentProvider
28+
OfflineComponentProvider,
29+
OnlineComponentProvider
3030
} from '../core/component_provider';
3131
import { FirestoreClient, PersistenceSettings } from '../core/firestore_client';
3232
import {
@@ -60,7 +60,6 @@ import { FieldPath, ResourcePath } from '../model/path';
6060
import { isServerTimestamp } from '../model/server_timestamps';
6161
import { refValue } from '../model/values';
6262
import { debugAssert, fail } from '../util/assert';
63-
import { AsyncObserver } from '../util/async_observer';
6463
import { AsyncQueue } from '../util/async_queue';
6564
import { Code, FirestoreError } from '../util/error';
6665
import {
@@ -485,16 +484,15 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService {
485484
this.ensureClientConfigured();
486485

487486
if (isPartialObserver(arg)) {
488-
return addSnapshotsInSyncListener(
489-
this._firestoreClient!,
487+
return this._firestoreClient!.addSnapshotsInSyncListener(
490488
arg as PartialObserver<void>
491489
);
492490
} else {
493491
validateArgType('Firestore.onSnapshotsInSync', 'function', 1, arg);
494492
const observer: PartialObserver<void> = {
495493
next: arg as () => void
496494
};
497-
return addSnapshotsInSyncListener(this._firestoreClient!, observer);
495+
return this._firestoreClient!.addSnapshotsInSyncListener(observer);
498496
}
499497
}
500498

@@ -686,29 +684,6 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService {
686684
}
687685
}
688686

689-
/** Registers the listener for onSnapshotsInSync() */
690-
export function addSnapshotsInSyncListener(
691-
firestoreClient: FirestoreClient,
692-
observer: PartialObserver<void>
693-
): Unsubscribe {
694-
const errHandler = (err: Error): void => {
695-
throw fail('Uncaught Error in onSnapshotsInSync');
696-
};
697-
const asyncObserver = new AsyncObserver<void>({
698-
next: () => {
699-
if (observer.next) {
700-
observer.next();
701-
}
702-
},
703-
error: errHandler
704-
});
705-
firestoreClient.addSnapshotsInSyncListener(asyncObserver);
706-
return () => {
707-
asyncObserver.mute();
708-
firestoreClient.removeSnapshotsInSyncListener(asyncObserver);
709-
};
710-
}
711-
712687
/**
713688
* A reference to a transaction.
714689
*/
@@ -1183,7 +1158,7 @@ export class DocumentReference<T = firestore.DocumentData>
11831158
1,
11841159
4
11851160
);
1186-
let options: firestore.SnapshotListenOptions = {
1161+
let options: ListenOptions = {
11871162
includeMetadataChanges: false
11881163
};
11891164
let currArg = 0;
@@ -1248,9 +1223,8 @@ export class DocumentReference<T = firestore.DocumentData>
12481223
complete: args[currArg + 2] as CompleteFn
12491224
};
12501225

1251-
return addDocSnapshotListener(
1252-
this._firestoreClient,
1253-
this._key,
1226+
return this._firestoreClient.listen(
1227+
newQueryForPath(this._key.path),
12541228
internalOptions,
12551229
observer
12561230
);
@@ -1312,40 +1286,6 @@ export class DocumentReference<T = firestore.DocumentData>
13121286
}
13131287
}
13141288

1315-
/** Registers an internal snapshot listener for `ref`. */
1316-
export function addDocSnapshotListener(
1317-
firestoreClient: FirestoreClient,
1318-
key: DocumentKey,
1319-
options: ListenOptions,
1320-
observer: PartialObserver<ViewSnapshot>
1321-
): Unsubscribe {
1322-
let errHandler = (err: Error): void => {
1323-
console.error('Uncaught Error in onSnapshot:', err);
1324-
};
1325-
if (observer.error) {
1326-
errHandler = observer.error.bind(observer);
1327-
}
1328-
1329-
const asyncObserver = new AsyncObserver<ViewSnapshot>({
1330-
next: snapshot => {
1331-
if (observer.next) {
1332-
observer.next(snapshot);
1333-
}
1334-
},
1335-
error: errHandler
1336-
});
1337-
const internalListener = firestoreClient.listen(
1338-
newQueryForPath(key.path),
1339-
asyncObserver,
1340-
options
1341-
);
1342-
1343-
return () => {
1344-
asyncObserver.mute();
1345-
firestoreClient.unlisten(internalListener);
1346-
};
1347-
}
1348-
13491289
/**
13501290
* Retrieves a latency-compensated document from the backend via a
13511291
* SnapshotListener.
@@ -1356,9 +1296,8 @@ export function getDocViaSnapshotListener(
13561296
options?: firestore.GetOptions
13571297
): Promise<ViewSnapshot> {
13581298
const result = new Deferred<ViewSnapshot>();
1359-
const unlisten = addDocSnapshotListener(
1360-
firestoreClient,
1361-
key,
1299+
const unlisten = firestoreClient.listen(
1300+
newQueryForPath(key.path),
13621301
{
13631302
includeMetadataChanges: true,
13641303
waitForSyncWhenOnline: true
@@ -2159,7 +2098,7 @@ export class Query<T = firestore.DocumentData> implements firestore.Query<T> {
21592098

21602099
onSnapshot(...args: unknown[]): Unsubscribe {
21612100
validateBetweenNumberOfArgs('Query.onSnapshot', arguments, 1, 4);
2162-
let options: firestore.SnapshotListenOptions = {};
2101+
let options: ListenOptions = {};
21632102
let currArg = 0;
21642103
if (
21652104
typeof args[currArg] === 'object' &&
@@ -2220,12 +2159,7 @@ export class Query<T = firestore.DocumentData> implements firestore.Query<T> {
22202159

22212160
validateHasExplicitOrderByForLimitToLast(this._query);
22222161
const firestoreClient = this.firestore.ensureClientConfigured();
2223-
return addQuerySnapshotListener(
2224-
firestoreClient,
2225-
this._query,
2226-
options,
2227-
observer
2228-
);
2162+
return firestoreClient.listen(this._query, options, observer);
22292163
}
22302164

22312165
get(options?: firestore.GetOptions): Promise<firestore.QuerySnapshot<T>> {
@@ -2254,8 +2188,7 @@ export function getDocsViaSnapshotListener(
22542188
options?: firestore.GetOptions
22552189
): Promise<ViewSnapshot> {
22562190
const result = new Deferred<ViewSnapshot>();
2257-
const unlisten = addQuerySnapshotListener(
2258-
firestore,
2191+
const unlisten = firestore.listen(
22592192
query,
22602193
{
22612194
includeMetadataChanges: true,
@@ -2287,35 +2220,6 @@ export function getDocsViaSnapshotListener(
22872220
return result.promise;
22882221
}
22892222

2290-
/** Registers an internal snapshot listener for `query`. */
2291-
export function addQuerySnapshotListener(
2292-
firestore: FirestoreClient,
2293-
query: InternalQuery,
2294-
options: ListenOptions,
2295-
observer: PartialObserver<ViewSnapshot>
2296-
): Unsubscribe {
2297-
let errHandler = (err: Error): void => {
2298-
console.error('Uncaught Error in onSnapshot:', err);
2299-
};
2300-
if (observer.error) {
2301-
errHandler = observer.error.bind(observer);
2302-
}
2303-
const asyncObserver = new AsyncObserver<ViewSnapshot>({
2304-
next: (result: ViewSnapshot): void => {
2305-
if (observer.next) {
2306-
observer.next(result);
2307-
}
2308-
},
2309-
error: errHandler
2310-
});
2311-
2312-
const internalListener = firestore.listen(query, asyncObserver, options);
2313-
return (): void => {
2314-
asyncObserver.mute();
2315-
firestore.unlisten(internalListener);
2316-
};
2317-
}
2318-
23192223
export class QuerySnapshot<T = firestore.DocumentData>
23202224
implements firestore.QuerySnapshot<T> {
23212225
private _cachedChanges: Array<firestore.DocumentChange<T>> | null = null;

packages/firestore/src/core/firestore_client.ts

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ import {
3535
} from './event_manager';
3636
import { SyncEngine } from './sync_engine';
3737
import { View } from './view';
38-
3938
import { SharedClientState } from '../local/shared_client_state';
4039
import { AutoId } from '../util/misc';
4140
import { DatabaseId, DatabaseInfo } from './database_info';
@@ -47,6 +46,7 @@ import {
4746
MemoryOfflineComponentProvider,
4847
OfflineComponentProvider
4948
} from './component_provider';
49+
import { AsyncObserver } from '../util/async_observer';
5050

5151
const LOG_TAG = 'FirestoreClient';
5252
const MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100;
@@ -391,24 +391,17 @@ export class FirestoreClient {
391391

392392
listen(
393393
query: Query,
394-
observer: Observer<ViewSnapshot>,
395-
options: ListenOptions
396-
): QueryListener {
394+
options: ListenOptions,
395+
observer: Partial<Observer<ViewSnapshot>>
396+
): () => void {
397397
this.verifyNotTerminated();
398-
const listener = new QueryListener(query, observer, options);
398+
const wrappedObserver = new AsyncObserver(observer);
399+
const listener = new QueryListener(query, wrappedObserver, options);
399400
this.asyncQueue.enqueueAndForget(() => this.eventMgr.listen(listener));
400-
return listener;
401-
}
402-
403-
unlisten(listener: QueryListener): void {
404-
// Checks for termination but does not raise error, allowing unlisten after
405-
// termination to be a no-op.
406-
if (this.clientTerminated) {
407-
return;
408-
}
409-
this.asyncQueue.enqueueAndForget(() => {
410-
return this.eventMgr.unlisten(listener);
411-
});
401+
return () => {
402+
wrappedObserver.mute();
403+
this.asyncQueue.enqueueAndForget(() => this.eventMgr.unlisten(listener));
404+
};
412405
}
413406

414407
async getDocumentFromLocalCache(
@@ -486,24 +479,18 @@ export class FirestoreClient {
486479
return this.databaseInfo.databaseId;
487480
}
488481

489-
addSnapshotsInSyncListener(observer: Observer<void>): void {
482+
addSnapshotsInSyncListener(observer: Partial<Observer<void>>): () => void {
490483
this.verifyNotTerminated();
491-
this.asyncQueue.enqueueAndForget(() => {
492-
this.eventMgr.addSnapshotsInSyncListener(observer);
493-
return Promise.resolve();
494-
});
495-
}
496-
497-
removeSnapshotsInSyncListener(observer: Observer<void>): void {
498-
// Checks for shutdown but does not raise error, allowing remove after
499-
// shutdown to be a no-op.
500-
if (this.clientTerminated) {
501-
return;
502-
}
503-
this.asyncQueue.enqueueAndForget(() => {
504-
this.eventMgr.removeSnapshotsInSyncListener(observer);
505-
return Promise.resolve();
506-
});
484+
const wrappedObserver = new AsyncObserver(observer);
485+
this.asyncQueue.enqueueAndForget(async () =>
486+
this.eventMgr.addSnapshotsInSyncListener(wrappedObserver)
487+
);
488+
return () => {
489+
wrappedObserver.mute();
490+
this.asyncQueue.enqueueAndForget(async () =>
491+
this.eventMgr.removeSnapshotsInSyncListener(wrappedObserver)
492+
);
493+
};
507494
}
508495

509496
get clientTerminated(): boolean {

packages/firestore/src/util/async_observer.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,20 @@ export class AsyncObserver<T> implements Observer<T> {
3030
*/
3131
private muted = false;
3232

33-
constructor(private observer: Observer<T>) {}
33+
constructor(private observer: Partial<Observer<T>>) {}
3434

3535
next(value: T): void {
36-
this.scheduleEvent(this.observer.next, value);
36+
if (this.observer.next) {
37+
this.scheduleEvent(this.observer.next, value);
38+
}
3739
}
3840

3941
error(error: Error): void {
40-
this.scheduleEvent(this.observer.error, error);
42+
if (this.observer.error) {
43+
this.scheduleEvent(this.observer.error, error);
44+
} else {
45+
console.error('Uncaught Error in snapshot listener:', error);
46+
}
4147
}
4248

4349
mute(): void {

0 commit comments

Comments
 (0)