Skip to content

Commit 829de71

Browse files
Simplify Listen API
1 parent 1fb82be commit 829de71

File tree

4 files changed

+58
-157
lines changed

4 files changed

+58
-157
lines changed

packages/firestore/exp/src/api/reference.ts

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@ import { debugAssert } from '../../../src/util/assert';
3131
import { cast } from '../../../lite/src/api/util';
3232
import { DocumentSnapshot, QuerySnapshot } from './snapshot';
3333
import {
34-
addDocSnapshotListener,
35-
addQuerySnapshotListener,
36-
addSnapshotsInSyncListener,
3734
applyFirestoreDataConverter,
3835
getDocsViaSnapshotListener,
3936
getDocViaSnapshotListener,
@@ -60,6 +57,8 @@ import {
6057
Unsubscribe
6158
} from '../../../src/api/observer';
6259
import { getFirestoreClient } from './components';
60+
import { AsyncObserver } from '../../../src/util/async_observer';
61+
import { newQueryForPath } from '../../../src/core/query';
6362

6463
export function getDoc<T>(
6564
reference: firestore.DocumentReference<T>
@@ -387,11 +386,10 @@ export function onSnapshot<T>(
387386
};
388387

389388
asyncObserver = getFirestoreClient(firestore).then(firestoreClient =>
390-
addDocSnapshotListener(
391-
firestoreClient,
392-
ref._key,
389+
firestoreClient.listen(
390+
newQueryForPath(ref._key.path),
393391
internalOptions,
394-
observer
392+
new AsyncObserver(observer)
395393
)
396394
);
397395
} else {
@@ -413,11 +411,10 @@ export function onSnapshot<T>(
413411
validateHasExplicitOrderByForLimitToLast(query._query);
414412

415413
asyncObserver = getFirestoreClient(firestore).then(firestoreClient =>
416-
addQuerySnapshotListener(
417-
firestoreClient,
414+
firestoreClient.listen(
418415
query._query,
419416
internalOptions,
420-
observer
417+
new AsyncObserver(observer)
421418
)
422419
);
423420
}
@@ -458,7 +455,7 @@ export function onSnapshotsInSync(
458455
const asyncObserver = getFirestoreClient(
459456
firestoreImpl
460457
).then(firestoreClient =>
461-
addSnapshotsInSyncListener(firestoreClient, observer)
458+
firestoreClient.addSnapshotsInSyncListener(new AsyncObserver(observer))
462459
);
463460

464461
// TODO(firestorexp): Add test that verifies that we don't raise a snapshot if

packages/firestore/src/api/database.ts

Lines changed: 19 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@ import * as api from '../protos/firestore_proto_api';
2222
import { FirebaseApp } from '@firebase/app-types';
2323
import { _FirebaseApp, FirebaseService } from '@firebase/app-types/private';
2424
import { DatabaseId, DatabaseInfo } from '../core/database_info';
25-
import { ListenOptions } from '../core/event_manager';
2625
import {
27-
OnlineComponentProvider,
2826
MemoryOfflineComponentProvider,
29-
OfflineComponentProvider
27+
OfflineComponentProvider,
28+
OnlineComponentProvider
3029
} from '../core/component_provider';
3130
import { FirestoreClient, PersistenceSettings } from '../core/firestore_client';
3231
import {
@@ -485,16 +484,17 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService {
485484
this.ensureClientConfigured();
486485

487486
if (isPartialObserver(arg)) {
488-
return addSnapshotsInSyncListener(
489-
this._firestoreClient!,
490-
arg as PartialObserver<void>
487+
return this._firestoreClient!.addSnapshotsInSyncListener(
488+
new AsyncObserver(arg as PartialObserver<void>)
491489
);
492490
} else {
493491
validateArgType('Firestore.onSnapshotsInSync', 'function', 1, arg);
494492
const observer: PartialObserver<void> = {
495493
next: arg as () => void
496494
};
497-
return addSnapshotsInSyncListener(this._firestoreClient!, observer);
495+
return this._firestoreClient!.addSnapshotsInSyncListener(
496+
new AsyncObserver(observer)
497+
);
498498
}
499499
}
500500

@@ -686,29 +686,6 @@ export class Firestore implements firestore.FirebaseFirestore, FirebaseService {
686686
}
687687
}
688688

689-
/** Registers the listener for onSnapshotsInSync() */
690-
export function addSnapshotsInSyncListener(
691-
firestoreClient: FirestoreClient,
692-
observer: PartialObserver<void>
693-
): Unsubscribe {
694-
const errHandler = (err: Error): void => {
695-
throw fail('Uncaught Error in onSnapshotsInSync');
696-
};
697-
const asyncObserver = new AsyncObserver<void>({
698-
next: () => {
699-
if (observer.next) {
700-
observer.next();
701-
}
702-
},
703-
error: errHandler
704-
});
705-
firestoreClient.addSnapshotsInSyncListener(asyncObserver);
706-
return () => {
707-
asyncObserver.mute();
708-
firestoreClient.removeSnapshotsInSyncListener(asyncObserver);
709-
};
710-
}
711-
712689
/**
713690
* A reference to a transaction.
714691
*/
@@ -1248,11 +1225,10 @@ export class DocumentReference<T = firestore.DocumentData>
12481225
complete: args[currArg + 2] as CompleteFn
12491226
};
12501227

1251-
return addDocSnapshotListener(
1252-
this._firestoreClient,
1253-
this._key,
1228+
return this._firestoreClient.listen(
1229+
newQueryForPath(this._key.path),
12541230
internalOptions,
1255-
observer
1231+
new AsyncObserver(observer)
12561232
);
12571233
}
12581234

@@ -1312,40 +1288,6 @@ export class DocumentReference<T = firestore.DocumentData>
13121288
}
13131289
}
13141290

1315-
/** Registers an internal snapshot listener for `ref`. */
1316-
export function addDocSnapshotListener(
1317-
firestoreClient: FirestoreClient,
1318-
key: DocumentKey,
1319-
options: ListenOptions,
1320-
observer: PartialObserver<ViewSnapshot>
1321-
): Unsubscribe {
1322-
let errHandler = (err: Error): void => {
1323-
console.error('Uncaught Error in onSnapshot:', err);
1324-
};
1325-
if (observer.error) {
1326-
errHandler = observer.error.bind(observer);
1327-
}
1328-
1329-
const asyncObserver = new AsyncObserver<ViewSnapshot>({
1330-
next: snapshot => {
1331-
if (observer.next) {
1332-
observer.next(snapshot);
1333-
}
1334-
},
1335-
error: errHandler
1336-
});
1337-
const internalListener = firestoreClient.listen(
1338-
newQueryForPath(key.path),
1339-
asyncObserver,
1340-
options
1341-
);
1342-
1343-
return () => {
1344-
asyncObserver.mute();
1345-
firestoreClient.unlisten(internalListener);
1346-
};
1347-
}
1348-
13491291
/**
13501292
* Retrieves a latency-compensated document from the backend via a
13511293
* SnapshotListener.
@@ -1356,14 +1298,13 @@ export function getDocViaSnapshotListener(
13561298
options?: firestore.GetOptions
13571299
): Promise<ViewSnapshot> {
13581300
const result = new Deferred<ViewSnapshot>();
1359-
const unlisten = addDocSnapshotListener(
1360-
firestoreClient,
1361-
key,
1301+
const unlisten = firestoreClient.listen(
1302+
newQueryForPath(key.path),
13621303
{
13631304
includeMetadataChanges: true,
13641305
waitForSyncWhenOnline: true
13651306
},
1366-
{
1307+
new AsyncObserver({
13671308
next: (snap: ViewSnapshot) => {
13681309
// Remove query first before passing event to user to avoid
13691310
// user actions affecting the now stale query.
@@ -1404,7 +1345,7 @@ export function getDocViaSnapshotListener(
14041345
}
14051346
},
14061347
error: e => result.reject(e)
1407-
}
1348+
})
14081349
);
14091350
return result.promise;
14101351
}
@@ -2220,11 +2161,10 @@ export class Query<T = firestore.DocumentData> implements firestore.Query<T> {
22202161

22212162
validateHasExplicitOrderByForLimitToLast(this._query);
22222163
const firestoreClient = this.firestore.ensureClientConfigured();
2223-
return addQuerySnapshotListener(
2224-
firestoreClient,
2164+
return firestoreClient.listen(
22252165
this._query,
22262166
options,
2227-
observer
2167+
new AsyncObserver(observer)
22282168
);
22292169
}
22302170

@@ -2254,14 +2194,13 @@ export function getDocsViaSnapshotListener(
22542194
options?: firestore.GetOptions
22552195
): Promise<ViewSnapshot> {
22562196
const result = new Deferred<ViewSnapshot>();
2257-
const unlisten = addQuerySnapshotListener(
2258-
firestore,
2197+
const unlisten = firestore.listen(
22592198
query,
22602199
{
22612200
includeMetadataChanges: true,
22622201
waitForSyncWhenOnline: true
22632202
},
2264-
{
2203+
new AsyncObserver({
22652204
next: snapshot => {
22662205
// Remove query first before passing event to user to avoid
22672206
// user actions affecting the now stale query.
@@ -2282,40 +2221,11 @@ export function getDocsViaSnapshotListener(
22822221
}
22832222
},
22842223
error: e => result.reject(e)
2285-
}
2224+
})
22862225
);
22872226
return result.promise;
22882227
}
22892228

2290-
/** Registers an internal snapshot listener for `query`. */
2291-
export function addQuerySnapshotListener(
2292-
firestore: FirestoreClient,
2293-
query: InternalQuery,
2294-
options: ListenOptions,
2295-
observer: PartialObserver<ViewSnapshot>
2296-
): Unsubscribe {
2297-
let errHandler = (err: Error): void => {
2298-
console.error('Uncaught Error in onSnapshot:', err);
2299-
};
2300-
if (observer.error) {
2301-
errHandler = observer.error.bind(observer);
2302-
}
2303-
const asyncObserver = new AsyncObserver<ViewSnapshot>({
2304-
next: (result: ViewSnapshot): void => {
2305-
if (observer.next) {
2306-
observer.next(result);
2307-
}
2308-
},
2309-
error: errHandler
2310-
});
2311-
2312-
const internalListener = firestore.listen(query, asyncObserver, options);
2313-
return (): void => {
2314-
asyncObserver.mute();
2315-
firestore.unlisten(internalListener);
2316-
};
2317-
}
2318-
23192229
export class QuerySnapshot<T = firestore.DocumentData>
23202230
implements firestore.QuerySnapshot<T> {
23212231
private _cachedChanges: Array<firestore.DocumentChange<T>> | null = null;

packages/firestore/src/core/firestore_client.ts

Lines changed: 21 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,12 @@ import { Query } from './query';
4343
import { Transaction } from './transaction';
4444
import { ViewSnapshot } from './view_snapshot';
4545
import {
46-
OnlineComponentProvider,
4746
MemoryOfflineComponentProvider,
48-
OfflineComponentProvider
47+
OfflineComponentProvider,
48+
OnlineComponentProvider
4949
} from './component_provider';
50+
import { Unsubscribe } from '../api/observer';
51+
import { AsyncObserver } from '../util/async_observer';
5052

5153
const LOG_TAG = 'FirestoreClient';
5254
const MAX_CONCURRENT_LIMBO_RESOLUTIONS = 100;
@@ -391,24 +393,16 @@ export class FirestoreClient {
391393

392394
listen(
393395
query: Query,
394-
observer: Observer<ViewSnapshot>,
395-
options: ListenOptions
396-
): QueryListener {
396+
options: ListenOptions,
397+
observer: AsyncObserver<ViewSnapshot>
398+
): Unsubscribe {
397399
this.verifyNotTerminated();
398400
const listener = new QueryListener(query, observer, options);
399401
this.asyncQueue.enqueueAndForget(() => this.eventMgr.listen(listener));
400-
return listener;
401-
}
402-
403-
unlisten(listener: QueryListener): void {
404-
// Checks for termination but does not raise error, allowing unlisten after
405-
// termination to be a no-op.
406-
if (this.clientTerminated) {
407-
return;
408-
}
409-
this.asyncQueue.enqueueAndForget(() => {
410-
return this.eventMgr.unlisten(listener);
411-
});
402+
return () => {
403+
observer.mute();
404+
this.asyncQueue.enqueueAndForget(() => this.eventMgr.unlisten(listener));
405+
};
412406
}
413407

414408
async getDocumentFromLocalCache(
@@ -486,24 +480,17 @@ export class FirestoreClient {
486480
return this.databaseInfo.databaseId;
487481
}
488482

489-
addSnapshotsInSyncListener(observer: Observer<void>): void {
483+
addSnapshotsInSyncListener(observer: AsyncObserver<void>): Unsubscribe {
490484
this.verifyNotTerminated();
491-
this.asyncQueue.enqueueAndForget(() => {
492-
this.eventMgr.addSnapshotsInSyncListener(observer);
493-
return Promise.resolve();
494-
});
495-
}
496-
497-
removeSnapshotsInSyncListener(observer: Observer<void>): void {
498-
// Checks for shutdown but does not raise error, allowing remove after
499-
// shutdown to be a no-op.
500-
if (this.clientTerminated) {
501-
return;
502-
}
503-
this.asyncQueue.enqueueAndForget(() => {
504-
this.eventMgr.removeSnapshotsInSyncListener(observer);
505-
return Promise.resolve();
506-
});
485+
this.asyncQueue.enqueueAndForget(async () =>
486+
this.eventMgr.addSnapshotsInSyncListener(observer)
487+
);
488+
return () => {
489+
observer.mute();
490+
this.asyncQueue.enqueueAndForget(async () =>
491+
this.eventMgr.removeSnapshotsInSyncListener(observer)
492+
);
493+
};
507494
}
508495

509496
get clientTerminated(): boolean {

packages/firestore/src/util/async_observer.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import { Observer } from '../core/event_manager';
1919
import { EventHandler } from './misc';
20+
import { PartialObserver } from '../api/observer';
2021

2122
/*
2223
* A wrapper implementation of Observer<T> that will dispatch events
@@ -30,14 +31,20 @@ export class AsyncObserver<T> implements Observer<T> {
3031
*/
3132
private muted = false;
3233

33-
constructor(private observer: Observer<T>) {}
34+
constructor(private observer: PartialObserver<T>) {}
3435

3536
next(value: T): void {
36-
this.scheduleEvent(this.observer.next, value);
37+
if (this.observer.next) {
38+
this.scheduleEvent(this.observer.next, value);
39+
}
3740
}
3841

3942
error(error: Error): void {
40-
this.scheduleEvent(this.observer.error, error);
43+
if (this.observer.error) {
44+
this.scheduleEvent(this.observer.error, error);
45+
} else {
46+
console.error('Uncaught Error in snapshot listener:', error);
47+
}
4148
}
4249

4350
mute(): void {

0 commit comments

Comments
 (0)