@@ -208,12 +208,10 @@ export class FirestoreClient {
208
208
/** Enables the network connection and requeues all pending operations. */
209
209
enableNetwork ( ) : Promise < void > {
210
210
this . verifyNotTerminated ( ) ;
211
- return enqueueNetworkEnabled (
212
- this . asyncQueue ,
213
- this . remoteStore ,
214
- this . persistence ,
215
- /* enabled= */ true
216
- ) ;
211
+ return this . asyncQueue . enqueue ( ( ) => {
212
+ this . persistence . setNetworkEnabled ( true ) ;
213
+ return this . remoteStore . enableNetwork ( ) ;
214
+ } ) ;
217
215
}
218
216
219
217
/**
@@ -353,12 +351,10 @@ export class FirestoreClient {
353
351
/** Disables the network connection. Pending operations will not complete. */
354
352
disableNetwork ( ) : Promise < void > {
355
353
this . verifyNotTerminated ( ) ;
356
- return enqueueNetworkEnabled (
357
- this . asyncQueue ,
358
- this . remoteStore ,
359
- this . persistence ,
360
- /* enabled= */ false
361
- ) ;
354
+ return this . asyncQueue . enqueue ( ( ) => {
355
+ this . persistence . setNetworkEnabled ( false ) ;
356
+ return this . remoteStore . disableNetwork ( ) ;
357
+ } ) ;
362
358
}
363
359
364
360
terminate ( ) : Promise < void > {
@@ -386,50 +382,74 @@ export class FirestoreClient {
386
382
*/
387
383
waitForPendingWrites ( ) : Promise < void > {
388
384
this . verifyNotTerminated ( ) ;
389
- return enqueueWaitForPendingWrites ( this . asyncQueue , this . syncEngine ) ;
385
+
386
+ const deferred = new Deferred < void > ( ) ;
387
+ this . asyncQueue . enqueueAndForget ( ( ) => {
388
+ return this . syncEngine . registerPendingWritesCallback ( deferred ) ;
389
+ } ) ;
390
+ return deferred . promise ;
390
391
}
391
392
392
393
listen (
393
394
query : Query ,
394
395
options : ListenOptions ,
395
396
observer : Partial < Observer < ViewSnapshot > >
396
- ) : Unsubscribe {
397
+ ) : ( ) => void {
397
398
this . verifyNotTerminated ( ) ;
398
- return enqueueListen (
399
- this . asyncQueue ,
400
- this . eventMgr ,
401
- query ,
402
- options ,
403
- observer
404
- ) ;
399
+ const wrappedObserver = new AsyncObserver ( observer ) ;
400
+ const listener = new QueryListener ( query , wrappedObserver , options ) ;
401
+ this . asyncQueue . enqueueAndForget ( ( ) => this . eventMgr . listen ( listener ) ) ;
402
+ return ( ) => {
403
+ wrappedObserver . mute ( ) ;
404
+ this . asyncQueue . enqueueAndForget ( ( ) => this . eventMgr . unlisten ( listener ) ) ;
405
+ } ;
405
406
}
406
407
407
- getDocumentFromLocalCache ( docKey : DocumentKey ) : Promise < Document | null > {
408
+ async getDocumentFromLocalCache (
409
+ docKey : DocumentKey
410
+ ) : Promise < Document | null > {
408
411
this . verifyNotTerminated ( ) ;
409
- return enqueueReadDocument ( this . asyncQueue , this . localStore , docKey ) ;
412
+ const deferred = new Deferred < Document | null > ( ) ;
413
+ await this . asyncQueue . enqueue ( ( ) =>
414
+ readDocument ( this . localStore , docKey , deferred )
415
+ ) ;
416
+ return deferred . promise ;
410
417
}
411
418
412
- getDocumentsFromLocalCache ( query : Query ) : Promise < ViewSnapshot > {
419
+ async getDocumentsFromLocalCache ( query : Query ) : Promise < ViewSnapshot > {
413
420
this . verifyNotTerminated ( ) ;
414
- return enqueueExecuteQuery ( this . asyncQueue , this . localStore , query ) ;
421
+ const deferred = new Deferred < ViewSnapshot > ( ) ;
422
+ await this . asyncQueue . enqueue ( ( ) =>
423
+ executeQuery ( this . localStore , query , deferred )
424
+ ) ;
425
+ return deferred . promise ;
415
426
}
416
427
417
428
write ( mutations : Mutation [ ] ) : Promise < void > {
418
429
this . verifyNotTerminated ( ) ;
419
- return enqueueWrite ( this . asyncQueue , this . syncEngine , mutations ) ;
430
+ const deferred = new Deferred < void > ( ) ;
431
+ this . asyncQueue . enqueueAndForget ( ( ) =>
432
+ this . syncEngine . write ( mutations , deferred )
433
+ ) ;
434
+ return deferred . promise ;
420
435
}
421
436
422
437
databaseId ( ) : DatabaseId {
423
438
return this . databaseInfo . databaseId ;
424
439
}
425
440
426
- addSnapshotsInSyncListener ( observer : Partial < Observer < void > > ) : Unsubscribe {
441
+ addSnapshotsInSyncListener ( observer : Partial < Observer < void > > ) : ( ) => void {
427
442
this . verifyNotTerminated ( ) ;
428
- return enqueueSnapshotsInSyncListen (
429
- this . asyncQueue ,
430
- this . eventMgr ,
431
- observer
443
+ const wrappedObserver = new AsyncObserver ( observer ) ;
444
+ this . asyncQueue . enqueueAndForget ( async ( ) =>
445
+ this . eventMgr . addSnapshotsInSyncListener ( wrappedObserver )
432
446
) ;
447
+ return ( ) => {
448
+ wrappedObserver . mute ( ) ;
449
+ this . asyncQueue . enqueueAndForget ( async ( ) =>
450
+ this . eventMgr . removeSnapshotsInSyncListener ( wrappedObserver )
451
+ ) ;
452
+ } ;
433
453
}
434
454
435
455
get clientTerminated ( ) : boolean {
@@ -518,68 +538,79 @@ export function enqueueSnapshotsInSyncListen(
518
538
} ;
519
539
}
520
540
541
+ async function readDocument (
542
+ localStore : LocalStore ,
543
+ docKey : DocumentKey ,
544
+ deferred : Deferred < Document | null >
545
+ ) : Promise < void > {
546
+ try {
547
+ const maybeDoc = await localStore . readDocument ( docKey ) ;
548
+ if ( maybeDoc instanceof Document ) {
549
+ deferred . resolve ( maybeDoc ) ;
550
+ } else if ( maybeDoc instanceof NoDocument ) {
551
+ deferred . resolve ( null ) ;
552
+ } else {
553
+ deferred . reject (
554
+ new FirestoreError (
555
+ Code . UNAVAILABLE ,
556
+ 'Failed to get document from cache. (However, this document may ' +
557
+ "exist on the server. Run again without setting 'source' in " +
558
+ 'the GetOptions to attempt to retrieve the document from the ' +
559
+ 'server.)'
560
+ )
561
+ ) ;
562
+ }
563
+ } catch ( e ) {
564
+ const firestoreError = wrapInUserErrorIfRecoverable (
565
+ e ,
566
+ `Failed to get document '${ docKey } from cache`
567
+ ) ;
568
+ deferred . reject ( firestoreError ) ;
569
+ }
570
+ }
571
+
521
572
export async function enqueueReadDocument (
522
573
asyncQueue : AsyncQueue ,
523
574
localStore : LocalStore ,
524
575
docKey : DocumentKey
525
576
) : Promise < Document | null > {
526
577
const deferred = new Deferred < Document | null > ( ) ;
527
- await asyncQueue . enqueue ( async ( ) => {
528
- try {
529
- const maybeDoc = await localStore . readDocument ( docKey ) ;
530
- if ( maybeDoc instanceof Document ) {
531
- deferred . resolve ( maybeDoc ) ;
532
- } else if ( maybeDoc instanceof NoDocument ) {
533
- deferred . resolve ( null ) ;
534
- } else {
535
- deferred . reject (
536
- new FirestoreError (
537
- Code . UNAVAILABLE ,
538
- 'Failed to get document from cache. (However, this document may ' +
539
- "exist on the server. Run again without setting 'source' in " +
540
- 'the GetOptions to attempt to retrieve the document from the ' +
541
- 'server.)'
542
- )
543
- ) ;
544
- }
545
- } catch ( e ) {
546
- const firestoreError = wrapInUserErrorIfRecoverable (
547
- e ,
548
- `Failed to get document '${ docKey } from cache`
549
- ) ;
550
- deferred . reject ( firestoreError ) ;
551
- }
552
- } ) ;
553
-
578
+ await asyncQueue . enqueue ( ( ) => readDocument ( localStore , docKey , deferred ) ) ;
554
579
return deferred . promise ;
555
580
}
556
581
582
+ async function executeQuery (
583
+ localStore : LocalStore ,
584
+ query : Query ,
585
+ deferred : Deferred < ViewSnapshot >
586
+ ) : Promise < void > {
587
+ try {
588
+ const queryResult = await localStore . executeQuery (
589
+ query ,
590
+ /* usePreviousResults= */ true
591
+ ) ;
592
+ const view = new View ( query , queryResult . remoteKeys ) ;
593
+ const viewDocChanges = view . computeDocChanges ( queryResult . documents ) ;
594
+ const viewChange = view . applyChanges (
595
+ viewDocChanges ,
596
+ /* updateLimboDocuments= */ false
597
+ ) ;
598
+ deferred . resolve ( viewChange . snapshot ! ) ;
599
+ } catch ( e ) {
600
+ const firestoreError = wrapInUserErrorIfRecoverable (
601
+ e ,
602
+ `Failed to execute query '${ query } against cache`
603
+ ) ;
604
+ deferred . reject ( firestoreError ) ;
605
+ }
606
+ }
607
+
557
608
export async function enqueueExecuteQuery (
558
609
asyncQueue : AsyncQueue ,
559
610
localStore : LocalStore ,
560
611
query : Query
561
612
) : Promise < ViewSnapshot > {
562
613
const deferred = new Deferred < ViewSnapshot > ( ) ;
563
- await asyncQueue . enqueue ( async ( ) => {
564
- try {
565
- const queryResult = await localStore . executeQuery (
566
- query ,
567
- /* usePreviousResults= */ true
568
- ) ;
569
- const view = new View ( query , queryResult . remoteKeys ) ;
570
- const viewDocChanges = view . computeDocChanges ( queryResult . documents ) ;
571
- const viewChange = view . applyChanges (
572
- viewDocChanges ,
573
- /* updateLimboDocuments= */ false
574
- ) ;
575
- deferred . resolve ( viewChange . snapshot ! ) ;
576
- } catch ( e ) {
577
- const firestoreError = wrapInUserErrorIfRecoverable (
578
- e ,
579
- `Failed to execute query '${ query } against cache`
580
- ) ;
581
- deferred . reject ( firestoreError ) ;
582
- }
583
- } ) ;
614
+ await asyncQueue . enqueue ( ( ) => executeQuery ( localStore , query , deferred ) ) ;
584
615
return deferred . promise ;
585
616
}
0 commit comments