Skip to content

Commit e6e97b8

Browse files
committed
Merge remote-tracking branch 'origin/master' into ErrorToFirestoreError
2 parents df809d2 + b0a75ac commit e6e97b8

File tree

5 files changed

+220
-140
lines changed

5 files changed

+220
-140
lines changed

packages/firestore/src/core/component_provider.ts

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,13 @@ import {
3838
SyncEngine
3939
} from './sync_engine';
4040
import { RemoteStore } from '../remote/remote_store';
41-
import { EventManager } from './event_manager';
41+
import {
42+
EventManager,
43+
newEventManager,
44+
eventManagerOnOnlineStateChange,
45+
eventManagerOnWatchChange,
46+
eventManagerOnWatchError
47+
} from './event_manager';
4248
import { AsyncQueue } from '../util/async_queue';
4349
import { DatabaseId, DatabaseInfo } from './database_info';
4450
import { Datastore, newDatastore } from '../remote/datastore';
@@ -334,7 +340,14 @@ export class OnlineComponentProvider {
334340
this.syncEngine = this.createSyncEngine(cfg);
335341
this.eventManager = this.createEventManager(cfg);
336342

337-
this.syncEngine.subscribe(this.eventManager);
343+
this.syncEngine.subscribe({
344+
onWatchChange: eventManagerOnWatchChange.bind(null, this.eventManager),
345+
onWatchError: eventManagerOnWatchError.bind(null, this.eventManager),
346+
onOnlineStateChange: eventManagerOnOnlineStateChange.bind(
347+
null,
348+
this.eventManager
349+
)
350+
});
338351

339352
this.sharedClientState.onlineStateHandler = onlineState =>
340353
applyOnlineStateChange(
@@ -353,7 +366,7 @@ export class OnlineComponentProvider {
353366
}
354367

355368
createEventManager(cfg: ComponentConfiguration): EventManager {
356-
return new EventManager();
369+
return newEventManager();
357370
}
358371

359372
createDatastore(cfg: ComponentConfiguration): Datastore {

packages/firestore/src/core/event_manager.ts

Lines changed: 151 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
* limitations under the License.
1616
*/
1717

18-
import { debugAssert } from '../util/assert';
18+
import { debugAssert, debugCast } from '../util/assert';
1919
import { FirestoreError } from '../util/error';
2020
import { EventHandler } from '../util/misc';
2121
import { ObjectMap } from '../util/obj_map';
2222
import { canonifyQuery, Query, queryEquals, stringifyQuery } from './query';
23-
import { SyncEngineListener } from './sync_engine';
2423
import { OnlineState } from './types';
2524
import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot';
2625
import { wrapInUserErrorIfRecoverable } from '../util/async_queue';
@@ -51,150 +50,196 @@ export interface Observer<T> {
5150
* assigned to SyncEngine's `listen()` and `unlisten()` API before usage. This
5251
* allows users to tree-shake the Watch logic.
5352
*/
54-
export class EventManager implements SyncEngineListener {
55-
private queries = new ObjectMap<Query, QueryListenersInfo>(
53+
export interface EventManager {
54+
onListen?: (query: Query) => Promise<ViewSnapshot>;
55+
onUnlisten?: (query: Query) => Promise<void>;
56+
}
57+
58+
export function newEventManager(): EventManager {
59+
return new EventManagerImpl();
60+
}
61+
62+
export class EventManagerImpl implements EventManager {
63+
queries = new ObjectMap<Query, QueryListenersInfo>(
5664
q => canonifyQuery(q),
5765
queryEquals
5866
);
5967

60-
private onlineState = OnlineState.Unknown;
68+
onlineState = OnlineState.Unknown;
6169

62-
private snapshotsInSyncListeners: Set<Observer<void>> = new Set();
70+
snapshotsInSyncListeners: Set<Observer<void>> = new Set();
6371

6472
/** Callback invoked when a Query is first listen to. */
6573
onListen?: (query: Query) => Promise<ViewSnapshot>;
6674
/** Callback invoked once all listeners to a Query are removed. */
6775
onUnlisten?: (query: Query) => Promise<void>;
76+
}
6877

69-
async listen(listener: QueryListener): Promise<void> {
70-
debugAssert(!!this.onListen, 'onListen not set');
71-
const query = listener.query;
72-
let firstListen = false;
73-
74-
let queryInfo = this.queries.get(query);
75-
if (!queryInfo) {
76-
firstListen = true;
77-
queryInfo = new QueryListenersInfo();
78-
}
78+
export async function eventManagerListen(
79+
eventManager: EventManager,
80+
listener: QueryListener
81+
): Promise<void> {
82+
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
7983

80-
if (firstListen) {
81-
try {
82-
queryInfo.viewSnap = await this.onListen(query);
83-
} catch (e) {
84-
const firestoreError = wrapInUserErrorIfRecoverable(
85-
e,
86-
`Initialization of query '${stringifyQuery(listener.query)}' failed`
87-
);
88-
listener.onError(firestoreError);
89-
return;
90-
}
91-
}
84+
debugAssert(!!eventManagerImpl.onListen, 'onListen not set');
85+
const query = listener.query;
86+
let firstListen = false;
9287

93-
this.queries.set(query, queryInfo);
94-
queryInfo.listeners.push(listener);
95-
96-
// Run global snapshot listeners if a consistent snapshot has been emitted.
97-
const raisedEvent = listener.applyOnlineStateChange(this.onlineState);
98-
debugAssert(
99-
!raisedEvent,
100-
"applyOnlineStateChange() shouldn't raise an event for brand-new listeners."
101-
);
88+
let queryInfo = eventManagerImpl.queries.get(query);
89+
if (!queryInfo) {
90+
firstListen = true;
91+
queryInfo = new QueryListenersInfo();
92+
}
10293

103-
if (queryInfo.viewSnap) {
104-
const raisedEvent = listener.onViewSnapshot(queryInfo.viewSnap);
105-
if (raisedEvent) {
106-
this.raiseSnapshotsInSyncEvent();
107-
}
94+
if (firstListen) {
95+
try {
96+
queryInfo.viewSnap = await eventManagerImpl.onListen(query);
97+
} catch (e) {
98+
const firestoreError = wrapInUserErrorIfRecoverable(
99+
e,
100+
`Initialization of query '${stringifyQuery(listener.query)}' failed`
101+
);
102+
listener.onError(firestoreError);
103+
return;
108104
}
109105
}
110106

111-
async unlisten(listener: QueryListener): Promise<void> {
112-
debugAssert(!!this.onUnlisten, 'onUnlisten not set');
113-
const query = listener.query;
114-
let lastListen = false;
115-
116-
const queryInfo = this.queries.get(query);
117-
if (queryInfo) {
118-
const i = queryInfo.listeners.indexOf(listener);
119-
if (i >= 0) {
120-
queryInfo.listeners.splice(i, 1);
121-
lastListen = queryInfo.listeners.length === 0;
122-
}
123-
}
107+
eventManagerImpl.queries.set(query, queryInfo);
108+
queryInfo.listeners.push(listener);
124109

125-
if (lastListen) {
126-
this.queries.delete(query);
127-
return this.onUnlisten(query);
128-
}
129-
}
110+
// Run global snapshot listeners if a consistent snapshot has been emitted.
111+
const raisedEvent = listener.applyOnlineStateChange(
112+
eventManagerImpl.onlineState
113+
);
114+
debugAssert(
115+
!raisedEvent,
116+
"applyOnlineStateChange() shouldn't raise an event for brand-new listeners."
117+
);
130118

131-
onWatchChange(viewSnaps: ViewSnapshot[]): void {
132-
let raisedEvent = false;
133-
for (const viewSnap of viewSnaps) {
134-
const query = viewSnap.query;
135-
const queryInfo = this.queries.get(query);
136-
if (queryInfo) {
137-
for (const listener of queryInfo.listeners) {
138-
if (listener.onViewSnapshot(viewSnap)) {
139-
raisedEvent = true;
140-
}
141-
}
142-
queryInfo.viewSnap = viewSnap;
143-
}
144-
}
119+
if (queryInfo.viewSnap) {
120+
const raisedEvent = listener.onViewSnapshot(queryInfo.viewSnap);
145121
if (raisedEvent) {
146-
this.raiseSnapshotsInSyncEvent();
122+
raiseSnapshotsInSyncEvent(eventManagerImpl);
147123
}
148124
}
125+
}
149126

150-
onWatchError(query: Query, error: FirestoreError): void {
151-
const queryInfo = this.queries.get(query);
152-
if (queryInfo) {
153-
for (const listener of queryInfo.listeners) {
154-
listener.onError(error);
155-
}
127+
export async function eventManagerUnlisten(
128+
eventManager: EventManager,
129+
listener: QueryListener
130+
): Promise<void> {
131+
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
132+
133+
debugAssert(!!eventManagerImpl.onUnlisten, 'onUnlisten not set');
134+
const query = listener.query;
135+
let lastListen = false;
136+
137+
const queryInfo = eventManagerImpl.queries.get(query);
138+
if (queryInfo) {
139+
const i = queryInfo.listeners.indexOf(listener);
140+
if (i >= 0) {
141+
queryInfo.listeners.splice(i, 1);
142+
lastListen = queryInfo.listeners.length === 0;
156143
}
144+
}
157145

158-
// Remove all listeners. NOTE: We don't need to call syncEngine.unlisten()
159-
// after an error.
160-
this.queries.delete(query);
146+
if (lastListen) {
147+
eventManagerImpl.queries.delete(query);
148+
return eventManagerImpl.onUnlisten(query);
161149
}
150+
}
162151

163-
onOnlineStateChange(onlineState: OnlineState): void {
164-
this.onlineState = onlineState;
165-
let raisedEvent = false;
166-
this.queries.forEach((_, queryInfo) => {
152+
export function eventManagerOnWatchChange(
153+
eventManager: EventManager,
154+
viewSnaps: ViewSnapshot[]
155+
): void {
156+
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
157+
158+
let raisedEvent = false;
159+
for (const viewSnap of viewSnaps) {
160+
const query = viewSnap.query;
161+
const queryInfo = eventManagerImpl.queries.get(query);
162+
if (queryInfo) {
167163
for (const listener of queryInfo.listeners) {
168-
// Run global snapshot listeners if a consistent snapshot has been emitted.
169-
if (listener.applyOnlineStateChange(onlineState)) {
164+
if (listener.onViewSnapshot(viewSnap)) {
170165
raisedEvent = true;
171166
}
172167
}
173-
});
174-
if (raisedEvent) {
175-
this.raiseSnapshotsInSyncEvent();
168+
queryInfo.viewSnap = viewSnap;
176169
}
177170
}
178-
179-
addSnapshotsInSyncListener(observer: Observer<void>): void {
180-
this.snapshotsInSyncListeners.add(observer);
181-
// Immediately fire an initial event, indicating all existing listeners
182-
// are in-sync.
183-
observer.next();
171+
if (raisedEvent) {
172+
raiseSnapshotsInSyncEvent(eventManagerImpl);
184173
}
174+
}
185175

186-
removeSnapshotsInSyncListener(observer: Observer<void>): void {
187-
this.snapshotsInSyncListeners.delete(observer);
176+
export function eventManagerOnWatchError(
177+
eventManager: EventManager,
178+
query: Query,
179+
error: FirestoreError
180+
): void {
181+
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
182+
183+
const queryInfo = eventManagerImpl.queries.get(query);
184+
if (queryInfo) {
185+
for (const listener of queryInfo.listeners) {
186+
listener.onError(error);
187+
}
188188
}
189189

190-
// Call all global snapshot listeners that have been set.
191-
private raiseSnapshotsInSyncEvent(): void {
192-
this.snapshotsInSyncListeners.forEach(observer => {
193-
observer.next();
194-
});
190+
// Remove all listeners. NOTE: We don't need to call syncEngine.unlisten()
191+
// after an error.
192+
eventManagerImpl.queries.delete(query);
193+
}
194+
195+
export function eventManagerOnOnlineStateChange(
196+
eventManager: EventManager,
197+
onlineState: OnlineState
198+
): void {
199+
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
200+
201+
eventManagerImpl.onlineState = onlineState;
202+
let raisedEvent = false;
203+
eventManagerImpl.queries.forEach((_, queryInfo) => {
204+
for (const listener of queryInfo.listeners) {
205+
// Run global snapshot listeners if a consistent snapshot has been emitted.
206+
if (listener.applyOnlineStateChange(onlineState)) {
207+
raisedEvent = true;
208+
}
209+
}
210+
});
211+
if (raisedEvent) {
212+
raiseSnapshotsInSyncEvent(eventManagerImpl);
195213
}
196214
}
197215

216+
export function addSnapshotsInSyncListener(
217+
eventManager: EventManager,
218+
observer: Observer<void>
219+
): void {
220+
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
221+
222+
eventManagerImpl.snapshotsInSyncListeners.add(observer);
223+
// Immediately fire an initial event, indicating all existing listeners
224+
// are in-sync.
225+
observer.next();
226+
}
227+
228+
export function removeSnapshotsInSyncListener(
229+
eventManager: EventManager,
230+
observer: Observer<void>
231+
): void {
232+
const eventManagerImpl = debugCast(eventManager, EventManagerImpl);
233+
eventManagerImpl.snapshotsInSyncListeners.delete(observer);
234+
}
235+
236+
// Call all global snapshot listeners that have been set.
237+
function raiseSnapshotsInSyncEvent(eventManagerImpl: EventManagerImpl): void {
238+
eventManagerImpl.snapshotsInSyncListeners.forEach(observer => {
239+
observer.next();
240+
});
241+
}
242+
198243
export interface ListenOptions {
199244
/** Raise events even when only the metadata changes */
200245
readonly includeMetadataChanges?: boolean;

0 commit comments

Comments
 (0)