@@ -29,13 +29,27 @@ import {
29
29
import { IndexedDbPersistence } from '../../../src/local/indexeddb_persistence' ;
30
30
import { PersistencePromise } from '../../../src/local/persistence_promise' ;
31
31
import { IndexedDbTransactionError } from '../../../src/local/simple_db' ;
32
- import { debugAssert } from '../../../src/util/assert' ;
32
+ import { debugAssert , fail } from '../../../src/util/assert' ;
33
33
import {
34
34
MemoryEagerDelegate ,
35
35
MemoryLruDelegate ,
36
36
MemoryPersistence
37
37
} from '../../../src/local/memory_persistence' ;
38
38
import { LruParams } from '../../../src/local/lru_garbage_collector' ;
39
+ import { Connection , Stream } from '../../../src/remote/connection' ;
40
+ import { StreamBridge } from '../../../src/remote/stream_bridge' ;
41
+ import * as api from '../../../src/protos/firestore_proto_api' ;
42
+ import { Deferred } from '../../../src/util/promise' ;
43
+ import { AsyncQueue } from '../../../src/util/async_queue' ;
44
+ import { WriteRequest } from '../../../src/remote/persistent_stream' ;
45
+ import { PlatformSupport } from '../../../src/platform/platform' ;
46
+ import { FirestoreError } from '../../../src/util/error' ;
47
+ import { Token } from '../../../src/api/credentials' ;
48
+ import { Observer } from '../../../src/core/event_manager' ;
49
+ import { ViewSnapshot } from '../../../src/core/view_snapshot' ;
50
+ import { Query } from '../../../src/core/query' ;
51
+ import { expectFirestoreError } from '../../util/helpers' ;
52
+ import { Mutation } from '../../../src/model/mutation' ;
39
53
40
54
/**
41
55
* A test-only MemoryPersistence implementation that is able to inject
@@ -51,7 +65,7 @@ export class MockMemoryPersistence extends MemoryPersistence {
51
65
transaction : PersistenceTransaction
52
66
) => PersistencePromise < T >
53
67
) : Promise < T > {
54
- if ( this . injectFailures ) {
68
+ if ( this . injectFailures ) {
55
69
return Promise . reject (
56
70
new IndexedDbTransactionError ( new Error ( 'Simulated retryable error' ) )
57
71
) ;
@@ -143,3 +157,251 @@ export class MockMemoryComponentProvider extends MemoryComponentProvider {
143
157
) ;
144
158
}
145
159
}
160
+
161
+ export class MockConnection implements Connection {
162
+ watchStream : StreamBridge <
163
+ api . ListenRequest ,
164
+ api . ListenResponse
165
+ > | null = null ;
166
+ writeStream : StreamBridge < api . WriteRequest , api . WriteResponse > | null = null ;
167
+ /**
168
+ * Used to make sure a write was actually sent out on the network before the
169
+ * test runner continues.
170
+ */
171
+ writeSendBarriers : Array < Deferred < api . WriteRequest > > = [ ] ;
172
+
173
+ /**
174
+ * The set of mutations sent out before there was a corresponding
175
+ * writeSendBarrier.
176
+ */
177
+ earlyWrites : api . WriteRequest [ ] = [ ] ;
178
+
179
+ /** The total number of requests sent to the watch stream. */
180
+ watchStreamRequestCount = 0 ;
181
+
182
+ /** The total number of requests sent to the write stream. */
183
+ writeStreamRequestCount = 0 ;
184
+
185
+ nextWriteStreamToken = 0 ;
186
+
187
+ constructor ( private queue : AsyncQueue ) { }
188
+
189
+ /**
190
+ * Tracks the currently active watch targets as detected by the mock watch
191
+ * stream, as a mapping from target ID to query Target.
192
+ */
193
+ activeTargets : { [ targetId : number ] : api . Target } = { } ;
194
+
195
+ /** A Deferred that is resolved once watch opens. */
196
+ watchOpen = new Deferred < void > ( ) ;
197
+
198
+ invokeRPC < Req > ( rpcName : string , request : Req ) : never {
199
+ throw new Error ( 'Not implemented!' ) ;
200
+ }
201
+
202
+ invokeStreamingRPC < Req > ( rpcName : string , request : Req ) : never {
203
+ throw new Error ( 'Not implemented!' ) ;
204
+ }
205
+
206
+ waitForWriteRequest ( ) : Promise < api . WriteRequest > {
207
+ const earlyWrite = this . earlyWrites . shift ( ) ;
208
+ if ( earlyWrite ) {
209
+ return Promise . resolve ( earlyWrite ) ;
210
+ }
211
+ const barrier = new Deferred < WriteRequest > ( ) ;
212
+ this . writeSendBarriers . push ( barrier ) ;
213
+ return barrier . promise ;
214
+ }
215
+
216
+ waitForWatchOpen ( ) : Promise < void > {
217
+ return this . watchOpen . promise ;
218
+ }
219
+
220
+ ackWrite (
221
+ commitTime ?: api . Timestamp ,
222
+ mutationResults ?: api . WriteResult [ ]
223
+ ) : void {
224
+ this . writeStream ! . callOnMessage ( {
225
+ // Convert to base64 string so it can later be parsed into ByteString.
226
+ streamToken : PlatformSupport . getPlatform ( ) . btoa (
227
+ 'write-stream-token-' + this . nextWriteStreamToken
228
+ ) ,
229
+ commitTime,
230
+ writeResults : mutationResults
231
+ } ) ;
232
+ this . nextWriteStreamToken ++ ;
233
+ }
234
+
235
+ failWrite ( err : FirestoreError ) : void {
236
+ this . resetAndCloseWriteStream ( err ) ;
237
+ }
238
+
239
+ private resetAndCloseWriteStream ( err ?: FirestoreError ) : void {
240
+ this . writeSendBarriers = [ ] ;
241
+ this . earlyWrites = [ ] ;
242
+ this . writeStream ! . callOnClose ( err ) ;
243
+ this . writeStream = null ;
244
+ }
245
+
246
+ failWatchStream ( err ?: FirestoreError ) : void {
247
+ this . resetAndCloseWatchStream ( err ) ;
248
+ }
249
+
250
+ private resetAndCloseWatchStream ( err ?: FirestoreError ) : void {
251
+ this . activeTargets = { } ;
252
+ this . watchOpen = new Deferred < void > ( ) ;
253
+ this . watchStream ! . callOnClose ( err ) ;
254
+ this . watchStream = null ;
255
+ }
256
+
257
+ openStream < Req , Resp > (
258
+ rpcName : string ,
259
+ token : Token | null
260
+ ) : Stream < Req , Resp > {
261
+ if ( rpcName === 'Write' ) {
262
+ if ( this . writeStream !== null ) {
263
+ throw new Error ( 'write stream opened twice' ) ;
264
+ }
265
+ let firstCall = true ;
266
+ const writeStream = new StreamBridge < WriteRequest , api . WriteResponse > ( {
267
+ sendFn : ( request : WriteRequest ) => {
268
+ ++ this . writeStreamRequestCount ;
269
+ if ( firstCall ) {
270
+ debugAssert (
271
+ ! ! request . database ,
272
+ 'projectId must be set in the first message'
273
+ ) ;
274
+ debugAssert (
275
+ ! request . writes ,
276
+ 'mutations must not be set in first request'
277
+ ) ;
278
+ this . ackWrite ( ) ; // just send the token
279
+ firstCall = false ;
280
+ return ;
281
+ }
282
+
283
+ debugAssert (
284
+ ! ! request . streamToken ,
285
+ 'streamToken must be set on all writes'
286
+ ) ;
287
+ debugAssert ( ! ! request . writes , 'writes must be set on all writes' ) ;
288
+
289
+ const barrier = this . writeSendBarriers . shift ( ) ;
290
+ if ( ! barrier ) {
291
+ // The test runner hasn't set up the barrier yet, so we queue
292
+ // up this mutation to provide to the barrier promise when it
293
+ // arrives.
294
+ this . earlyWrites . push ( request ) ;
295
+ } else {
296
+ // The test runner is waiting on a write invocation, now that we
297
+ // have it we can resolve the write send barrier. If we add
298
+ // (automatic) batching support we need to make sure the number of
299
+ // batches matches the number of calls to waitForWriteRequest.
300
+ barrier . resolve ( request ) ;
301
+ }
302
+ } ,
303
+ closeFn : ( ) => {
304
+ this . resetAndCloseWriteStream ( ) ;
305
+ }
306
+ } ) ;
307
+ this . queue . enqueueAndForget ( async ( ) => {
308
+ if ( this . writeStream === writeStream ) {
309
+ writeStream . callOnOpen ( ) ;
310
+ }
311
+ } ) ;
312
+ this . writeStream = writeStream ;
313
+ // Replace 'any' with conditional types.
314
+ return writeStream as any ; // eslint-disable-line @typescript-eslint/no-explicit-any
315
+ } else {
316
+ debugAssert ( rpcName === 'Listen' , 'Unexpected rpc name: ' + rpcName ) ;
317
+ if ( this . watchStream !== null ) {
318
+ throw new Error ( 'Stream opened twice!' ) ;
319
+ }
320
+ const watchStream = new StreamBridge <
321
+ api . ListenRequest ,
322
+ api . ListenResponse
323
+ > ( {
324
+ sendFn : ( request : api . ListenRequest ) => {
325
+ ++ this . watchStreamRequestCount ;
326
+ if ( request . addTarget ) {
327
+ const targetId = request . addTarget . targetId ! ;
328
+ this . activeTargets [ targetId ] = request . addTarget ;
329
+ } else if ( request . removeTarget ) {
330
+ delete this . activeTargets [ request . removeTarget ] ;
331
+ } else {
332
+ fail ( 'Invalid listen request' ) ;
333
+ }
334
+ } ,
335
+ closeFn : ( ) => {
336
+ this . resetAndCloseWatchStream ( ) ;
337
+ }
338
+ } ) ;
339
+ // Call on open immediately after returning
340
+ this . queue . enqueueAndForget ( async ( ) => {
341
+ if ( this . watchStream === watchStream ) {
342
+ watchStream . callOnOpen ( ) ;
343
+ this . watchOpen . resolve ( ) ;
344
+ }
345
+ } ) ;
346
+ this . watchStream = watchStream ;
347
+ // Replace 'any' with conditional types.
348
+ return this . watchStream as any ; // eslint-disable-line @typescript-eslint/no-explicit-any
349
+ }
350
+ }
351
+ }
352
+
353
+ /**
354
+ * An Observer<ViewSnapshot> that forwards events to the provided callback.
355
+ */
356
+ export class EventAggregator implements Observer < ViewSnapshot > {
357
+ constructor (
358
+ private query : Query ,
359
+ private pushEvent : ( e : QueryEvent ) => void
360
+ ) { }
361
+
362
+ next ( view : ViewSnapshot ) : void {
363
+ this . pushEvent ( {
364
+ query : view . query ,
365
+ view
366
+ } ) ;
367
+ }
368
+
369
+ error ( error : Error ) : void {
370
+ expectFirestoreError ( error ) ;
371
+ this . pushEvent ( { query : this . query , error : error as FirestoreError } ) ;
372
+ }
373
+ }
374
+
375
+ /**
376
+ * FIFO queue that tracks all outstanding mutations for a single test run.
377
+ * As these mutations are shared among the set of active clients, any client can
378
+ * add or retrieve mutations.
379
+ */
380
+ // PORTING NOTE: Multi-tab only.
381
+ export class SharedWriteTracker {
382
+ private writes : Mutation [ ] [ ] = [ ] ;
383
+
384
+ push ( write : Mutation [ ] ) : void {
385
+ this . writes . push ( write ) ;
386
+ }
387
+
388
+ peek ( ) : Mutation [ ] {
389
+ debugAssert ( this . writes . length > 0 , 'No pending mutations' ) ;
390
+ return this . writes [ 0 ] ;
391
+ }
392
+
393
+ shift ( ) : Mutation [ ] {
394
+ debugAssert ( this . writes . length > 0 , 'No pending mutations' ) ;
395
+ return this . writes . shift ( ) ! ;
396
+ }
397
+ }
398
+
399
+ /**
400
+ * Interface used for object that contain exactly one of either a view snapshot
401
+ * or an error for the given query.
402
+ */
403
+ export interface QueryEvent {
404
+ query : Query ;
405
+ view ?: ViewSnapshot ;
406
+ error ?: FirestoreError ;
407
+ }
0 commit comments