Skip to content

Commit 68802a3

Browse files
WIP
1 parent 2fa0353 commit 68802a3

File tree

8 files changed

+716
-598
lines changed

8 files changed

+716
-598
lines changed

packages/firestore/exp/dependencies.json

Lines changed: 139 additions & 97 deletions
Large diffs are not rendered by default.

packages/firestore/src/core/event_manager.ts

Lines changed: 79 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,12 @@ import { debugAssert } from '../util/assert';
1919
import { EventHandler } from '../util/misc';
2020
import { ObjectMap } from '../util/obj_map';
2121
import { canonifyQuery, Query, queryEquals, stringifyQuery } from './query';
22-
import { SyncEngine, SyncEngineListener } from './sync_engine';
22+
import {
23+
SyncEngine,
24+
SyncEngineListener,
25+
listen as syncEngineListen,
26+
unlisten as syncEngineUnlisten
27+
} from './sync_engine';
2328
import { OnlineState } from './types';
2429
import { ChangeType, DocumentViewChange, ViewSnapshot } from './view_snapshot';
2530
import { wrapInUserErrorIfRecoverable } from '../util/async_queue';
@@ -47,79 +52,19 @@ export interface Observer<T> {
4752
* backend.
4853
*/
4954
export class EventManager implements SyncEngineListener {
50-
private queries = new ObjectMap<Query, QueryListenersInfo>(
55+
queries = new ObjectMap<Query, QueryListenersInfo>(
5156
q => canonifyQuery(q),
5257
queryEquals
5358
);
5459

55-
private onlineState = OnlineState.Unknown;
60+
onlineState = OnlineState.Unknown;
5661

5762
private snapshotsInSyncListeners: Set<Observer<void>> = new Set();
5863

59-
constructor(private syncEngine: SyncEngine) {
64+
constructor(public syncEngine: SyncEngine) {
6065
this.syncEngine.subscribe(this);
6166
}
6267

63-
async listen(listener: QueryListener): Promise<void> {
64-
const query = listener.query;
65-
let firstListen = false;
66-
67-
let queryInfo = this.queries.get(query);
68-
if (!queryInfo) {
69-
firstListen = true;
70-
queryInfo = new QueryListenersInfo();
71-
}
72-
73-
if (firstListen) {
74-
try {
75-
queryInfo.viewSnap = await this.syncEngine.listen(query);
76-
} catch (e) {
77-
const firestoreError = wrapInUserErrorIfRecoverable(
78-
e,
79-
`Initialization of query '${stringifyQuery(listener.query)}' failed`
80-
);
81-
listener.onError(firestoreError);
82-
return;
83-
}
84-
}
85-
86-
this.queries.set(query, queryInfo);
87-
queryInfo.listeners.push(listener);
88-
89-
// Run global snapshot listeners if a consistent snapshot has been emitted.
90-
const raisedEvent = listener.applyOnlineStateChange(this.onlineState);
91-
debugAssert(
92-
!raisedEvent,
93-
"applyOnlineStateChange() shouldn't raise an event for brand-new listeners."
94-
);
95-
96-
if (queryInfo.viewSnap) {
97-
const raisedEvent = listener.onViewSnapshot(queryInfo.viewSnap);
98-
if (raisedEvent) {
99-
this.raiseSnapshotsInSyncEvent();
100-
}
101-
}
102-
}
103-
104-
async unlisten(listener: QueryListener): Promise<void> {
105-
const query = listener.query;
106-
let lastListen = false;
107-
108-
const queryInfo = this.queries.get(query);
109-
if (queryInfo) {
110-
const i = queryInfo.listeners.indexOf(listener);
111-
if (i >= 0) {
112-
queryInfo.listeners.splice(i, 1);
113-
lastListen = queryInfo.listeners.length === 0;
114-
}
115-
}
116-
117-
if (lastListen) {
118-
this.queries.delete(query);
119-
return this.syncEngine.unlisten(query);
120-
}
121-
}
122-
12368
onWatchChange(viewSnaps: ViewSnapshot[]): void {
12469
let raisedEvent = false;
12570
for (const viewSnap of viewSnaps) {
@@ -180,7 +125,7 @@ export class EventManager implements SyncEngineListener {
180125
}
181126

182127
// Call all global snapshot listeners that have been set.
183-
private raiseSnapshotsInSyncEvent(): void {
128+
raiseSnapshotsInSyncEvent(): void {
184129
this.snapshotsInSyncListeners.forEach(observer => {
185130
observer.next();
186131
});
@@ -357,3 +302,72 @@ export class QueryListener {
357302
this.queryObserver.next(snap);
358303
}
359304
}
305+
306+
export async function listen(
307+
eventManager: EventManager,
308+
listener: QueryListener
309+
): Promise<void> {
310+
const query = listener.query;
311+
let firstListen = false;
312+
313+
let queryInfo = eventManager.queries.get(query);
314+
if (!queryInfo) {
315+
firstListen = true;
316+
queryInfo = new QueryListenersInfo();
317+
}
318+
319+
if (firstListen) {
320+
try {
321+
queryInfo.viewSnap = await syncEngineListen(
322+
eventManager.syncEngine,
323+
query
324+
);
325+
} catch (e) {
326+
const firestoreError = wrapInUserErrorIfRecoverable(
327+
e,
328+
`Initialization of query '${stringifyQuery(listener.query)}' failed`
329+
);
330+
listener.onError(firestoreError);
331+
return;
332+
}
333+
}
334+
335+
eventManager.queries.set(query, queryInfo);
336+
queryInfo.listeners.push(listener);
337+
338+
// Run global snapshot listeners if a consistent snapshot has been emitted.
339+
const raisedEvent = listener.applyOnlineStateChange(eventManager.onlineState);
340+
debugAssert(
341+
!raisedEvent,
342+
"applyOnlineStateChange() shouldn't raise an event for brand-new listeners."
343+
);
344+
345+
if (queryInfo.viewSnap) {
346+
const raisedEvent = listener.onViewSnapshot(queryInfo.viewSnap);
347+
if (raisedEvent) {
348+
eventManager.raiseSnapshotsInSyncEvent();
349+
}
350+
}
351+
}
352+
353+
export async function unlisten(
354+
eventManager: EventManager,
355+
listener: QueryListener
356+
): Promise<void> {
357+
const query = listener.query;
358+
let lastListen = false;
359+
360+
const queryInfo = eventManager.queries.get(query);
361+
if (queryInfo) {
362+
const i = queryInfo.listeners.indexOf(listener);
363+
if (i >= 0) {
364+
queryInfo.listeners.splice(i, 1);
365+
lastListen = queryInfo.listeners.length === 0;
366+
}
367+
}
368+
369+
if (lastListen) {
370+
eventManager.queries.delete(query);
371+
return syncEngineUnlisten(eventManager.syncEngine, query);
372+
}
373+
}

packages/firestore/src/core/firestore_client.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ import {
3333
EventManager,
3434
ListenOptions,
3535
Observer,
36-
QueryListener
36+
QueryListener,
37+
listen,
38+
unlisten
3739
} from './event_manager';
3840
import { SyncEngine } from './sync_engine';
3941
import { View } from './view';
@@ -403,10 +405,10 @@ export class FirestoreClient {
403405
this.verifyNotTerminated();
404406
const wrappedObserver = new AsyncObserver(observer);
405407
const listener = new QueryListener(query, wrappedObserver, options);
406-
this.asyncQueue.enqueueAndForget(() => this.eventMgr.listen(listener));
408+
this.asyncQueue.enqueueAndForget(() => listen(this.eventMgr, listener));
407409
return () => {
408410
wrappedObserver.mute();
409-
this.asyncQueue.enqueueAndForget(() => this.eventMgr.unlisten(listener));
411+
this.asyncQueue.enqueueAndForget(() => unlisten(this.eventMgr, listener));
410412
};
411413
}
412414

@@ -569,10 +571,10 @@ export function enqueueListen(
569571
): Unsubscribe {
570572
const wrappedObserver = new AsyncObserver(observer);
571573
const listener = new QueryListener(query, wrappedObserver, options);
572-
asyncQueue.enqueueAndForget(() => eventManger.listen(listener));
574+
asyncQueue.enqueueAndForget(() => listen(eventManger, listener));
573575
return () => {
574576
wrappedObserver.mute();
575-
asyncQueue.enqueueAndForget(() => eventManger.unlisten(listener));
577+
asyncQueue.enqueueAndForget(() => unlisten(eventManger, listener));
576578
};
577579
}
578580

0 commit comments

Comments
 (0)