@@ -19,7 +19,12 @@ import { debugAssert } from '../util/assert';
19
19
import { EventHandler } from '../util/misc' ;
20
20
import { ObjectMap } from '../util/obj_map' ;
21
21
import { canonifyQuery , Query , queryEquals , stringifyQuery } from './query' ;
22
- import { SyncEngine , SyncEngineListener } from './sync_engine' ;
22
+ import {
23
+ SyncEngine ,
24
+ SyncEngineListener ,
25
+ listen as syncEngineListen ,
26
+ unlisten as syncEngineUnlisten
27
+ } from './sync_engine' ;
23
28
import { OnlineState } from './types' ;
24
29
import { ChangeType , DocumentViewChange , ViewSnapshot } from './view_snapshot' ;
25
30
import { wrapInUserErrorIfRecoverable } from '../util/async_queue' ;
@@ -47,79 +52,19 @@ export interface Observer<T> {
47
52
* backend.
48
53
*/
49
54
export class EventManager implements SyncEngineListener {
50
- private queries = new ObjectMap < Query , QueryListenersInfo > (
55
+ queries = new ObjectMap < Query , QueryListenersInfo > (
51
56
q => canonifyQuery ( q ) ,
52
57
queryEquals
53
58
) ;
54
59
55
- private onlineState = OnlineState . Unknown ;
60
+ onlineState = OnlineState . Unknown ;
56
61
57
62
private snapshotsInSyncListeners : Set < Observer < void > > = new Set ( ) ;
58
63
59
- constructor ( private syncEngine : SyncEngine ) {
64
+ constructor ( readonly syncEngine : SyncEngine ) {
60
65
this . syncEngine . subscribe ( this ) ;
61
66
}
62
67
63
- async listen ( listener : QueryListener ) : Promise < void > {
64
- const query = listener . query ;
65
- let firstListen = false ;
66
-
67
- let queryInfo = this . queries . get ( query ) ;
68
- if ( ! queryInfo ) {
69
- firstListen = true ;
70
- queryInfo = new QueryListenersInfo ( ) ;
71
- }
72
-
73
- if ( firstListen ) {
74
- try {
75
- queryInfo . viewSnap = await this . syncEngine . listen ( query ) ;
76
- } catch ( e ) {
77
- const firestoreError = wrapInUserErrorIfRecoverable (
78
- e ,
79
- `Initialization of query '${ stringifyQuery ( listener . query ) } ' failed`
80
- ) ;
81
- listener . onError ( firestoreError ) ;
82
- return ;
83
- }
84
- }
85
-
86
- this . queries . set ( query , queryInfo ) ;
87
- queryInfo . listeners . push ( listener ) ;
88
-
89
- // Run global snapshot listeners if a consistent snapshot has been emitted.
90
- const raisedEvent = listener . applyOnlineStateChange ( this . onlineState ) ;
91
- debugAssert (
92
- ! raisedEvent ,
93
- "applyOnlineStateChange() shouldn't raise an event for brand-new listeners."
94
- ) ;
95
-
96
- if ( queryInfo . viewSnap ) {
97
- const raisedEvent = listener . onViewSnapshot ( queryInfo . viewSnap ) ;
98
- if ( raisedEvent ) {
99
- this . raiseSnapshotsInSyncEvent ( ) ;
100
- }
101
- }
102
- }
103
-
104
- async unlisten ( listener : QueryListener ) : Promise < void > {
105
- const query = listener . query ;
106
- let lastListen = false ;
107
-
108
- const queryInfo = this . queries . get ( query ) ;
109
- if ( queryInfo ) {
110
- const i = queryInfo . listeners . indexOf ( listener ) ;
111
- if ( i >= 0 ) {
112
- queryInfo . listeners . splice ( i , 1 ) ;
113
- lastListen = queryInfo . listeners . length === 0 ;
114
- }
115
- }
116
-
117
- if ( lastListen ) {
118
- this . queries . delete ( query ) ;
119
- return this . syncEngine . unlisten ( query ) ;
120
- }
121
- }
122
-
123
68
onWatchChange ( viewSnaps : ViewSnapshot [ ] ) : void {
124
69
let raisedEvent = false ;
125
70
for ( const viewSnap of viewSnaps ) {
@@ -180,7 +125,7 @@ export class EventManager implements SyncEngineListener {
180
125
}
181
126
182
127
// Call all global snapshot listeners that have been set.
183
- private raiseSnapshotsInSyncEvent ( ) : void {
128
+ raiseSnapshotsInSyncEvent ( ) : void {
184
129
this . snapshotsInSyncListeners . forEach ( observer => {
185
130
observer . next ( ) ;
186
131
} ) ;
@@ -357,3 +302,72 @@ export class QueryListener {
357
302
this . queryObserver . next ( snap ) ;
358
303
}
359
304
}
305
+
306
+ export async function listen (
307
+ eventManager : EventManager ,
308
+ listener : QueryListener
309
+ ) : Promise < void > {
310
+ const query = listener . query ;
311
+ let firstListen = false ;
312
+
313
+ let queryInfo = eventManager . queries . get ( query ) ;
314
+ if ( ! queryInfo ) {
315
+ firstListen = true ;
316
+ queryInfo = new QueryListenersInfo ( ) ;
317
+ }
318
+
319
+ if ( firstListen ) {
320
+ try {
321
+ queryInfo . viewSnap = await syncEngineListen (
322
+ eventManager . syncEngine ,
323
+ query
324
+ ) ;
325
+ } catch ( e ) {
326
+ const firestoreError = wrapInUserErrorIfRecoverable (
327
+ e ,
328
+ `Initialization of query '${ stringifyQuery ( listener . query ) } ' failed`
329
+ ) ;
330
+ listener . onError ( firestoreError ) ;
331
+ return ;
332
+ }
333
+ }
334
+
335
+ eventManager . queries . set ( query , queryInfo ) ;
336
+ queryInfo . listeners . push ( listener ) ;
337
+
338
+ // Run global snapshot listeners if a consistent snapshot has been emitted.
339
+ const raisedEvent = listener . applyOnlineStateChange ( eventManager . onlineState ) ;
340
+ debugAssert (
341
+ ! raisedEvent ,
342
+ "applyOnlineStateChange() shouldn't raise an event for brand-new listeners."
343
+ ) ;
344
+
345
+ if ( queryInfo . viewSnap ) {
346
+ const raisedEvent = listener . onViewSnapshot ( queryInfo . viewSnap ) ;
347
+ if ( raisedEvent ) {
348
+ eventManager . raiseSnapshotsInSyncEvent ( ) ;
349
+ }
350
+ }
351
+ }
352
+
353
+ export async function unlisten (
354
+ eventManager : EventManager ,
355
+ listener : QueryListener
356
+ ) : Promise < void > {
357
+ const query = listener . query ;
358
+ let lastListen = false ;
359
+
360
+ const queryInfo = eventManager . queries . get ( query ) ;
361
+ if ( queryInfo ) {
362
+ const i = queryInfo . listeners . indexOf ( listener ) ;
363
+ if ( i >= 0 ) {
364
+ queryInfo . listeners . splice ( i , 1 ) ;
365
+ lastListen = queryInfo . listeners . length === 0 ;
366
+ }
367
+ }
368
+
369
+ if ( lastListen ) {
370
+ eventManager . queries . delete ( query ) ;
371
+ return syncEngineUnlisten ( eventManager . syncEngine , query ) ;
372
+ }
373
+ }
0 commit comments