@@ -147,7 +147,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
147
147
private queryViewsByQuery = new ObjectMap < Query , QueryView > ( q =>
148
148
q . canonicalId ( )
149
149
) ;
150
- private queryViewsByTarget : { [ targetId : number ] : QueryView } = { } ;
150
+ private queriesByTarget : { [ targetId : number ] : Query [ ] } = { } ;
151
151
private limboTargetsByKey = new SortedMap < DocumentKey , TargetId > (
152
152
DocumentKey . comparator
153
153
) ;
@@ -223,7 +223,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
223
223
targetId = queryData . targetId ;
224
224
viewSnapshot = await this . initializeViewAndComputeSnapshot (
225
225
query ,
226
- queryData . targetId ,
226
+ targetId ,
227
227
status === 'current'
228
228
) ;
229
229
if ( this . isPrimary ) {
@@ -270,7 +270,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
270
270
271
271
const data = new QueryView ( query , targetId , view ) ;
272
272
this . queryViewsByQuery . set ( query , data ) ;
273
- this . queryViewsByTarget [ targetId ] = data ;
273
+ if ( ! this . queriesByTarget [ targetId ] ) {
274
+ this . queriesByTarget [ targetId ] = [ ] ;
275
+ }
276
+ this . queriesByTarget [ targetId ] . push ( query ) ;
274
277
return viewChange . snapshot ! ;
275
278
}
276
279
@@ -302,6 +305,8 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
302
305
const queryView = this . queryViewsByQuery . get ( query ) ! ;
303
306
assert ( ! ! queryView , 'Trying to unlisten on query not found:' + query ) ;
304
307
308
+ // TODO(wuandy): Note this does not handle the case where multiple queries
309
+ // map to one target, and user request to unlisten on of the queries.
305
310
if ( this . isPrimary ) {
306
311
// We need to remove the local query target first to allow us to verify
307
312
// whether any other client is still interested in this target.
@@ -312,18 +317,18 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
312
317
313
318
if ( ! targetRemainsActive ) {
314
319
await this . localStore
315
- . releaseQuery ( query , /*keepPersistedQueryData=*/ false )
320
+ . releaseTarget ( queryView . targetId , /*keepPersistedQueryData=*/ false )
316
321
. then ( ( ) => {
317
322
this . sharedClientState . clearQueryState ( queryView . targetId ) ;
318
323
this . remoteStore . unlisten ( queryView . targetId ) ;
319
- this . removeAndCleanupQuery ( queryView ) ;
324
+ this . removeAndCleanupTarget ( queryView . targetId ) ;
320
325
} )
321
326
. catch ( ignoreIfPrimaryLeaseLoss ) ;
322
327
}
323
328
} else {
324
- this . removeAndCleanupQuery ( queryView ) ;
325
- await this . localStore . releaseQuery (
326
- query ,
329
+ this . removeAndCleanupTarget ( queryView . targetId ) ;
330
+ await this . localStore . releaseTarget (
331
+ queryView . targetId ,
327
332
/*keepPersistedQueryData=*/ true
328
333
) ;
329
334
}
@@ -495,13 +500,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
495
500
) ;
496
501
return this . applyRemoteEvent ( event ) ;
497
502
} else {
498
- const queryView = this . queryViewsByTarget [ targetId ] ;
499
- assert ( ! ! queryView , 'Unknown targetId: ' + targetId ) ;
500
503
await this . localStore
501
- . releaseQuery ( queryView . query , /* keepPersistedQueryData */ false )
502
- . then ( ( ) => this . removeAndCleanupQuery ( queryView ) )
504
+ . releaseTarget ( targetId , /* keepPersistedQueryData */ false )
505
+ . then ( ( ) => this . removeAndCleanupTarget ( targetId , err ) )
503
506
. catch ( ignoreIfPrimaryLeaseLoss ) ;
504
- this . syncEngineListener ! . onWatchError ( queryView . query , err ) ;
505
507
}
506
508
}
507
509
@@ -681,17 +683,30 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
681
683
}
682
684
}
683
685
684
- private removeAndCleanupQuery ( queryView : QueryView ) : void {
685
- this . sharedClientState . removeLocalQueryTarget ( queryView . targetId ) ;
686
+ private removeAndCleanupTarget (
687
+ targetId : number ,
688
+ error : Error | null = null
689
+ ) : void {
690
+ this . sharedClientState . removeLocalQueryTarget ( targetId ) ;
691
+
692
+ assert (
693
+ this . queriesByTarget [ targetId ] &&
694
+ this . queriesByTarget [ targetId ] . length !== 0 ,
695
+ `There are no queries mapped to target id ${ targetId } `
696
+ ) ;
697
+
698
+ for ( const query of this . queriesByTarget [ targetId ] ) {
699
+ this . queryViewsByQuery . delete ( query ) ;
700
+ if ( error ) {
701
+ this . syncEngineListener ! . onWatchError ( query , error ) ;
702
+ }
703
+ }
686
704
687
- this . queryViewsByQuery . delete ( queryView . query ) ;
688
- delete this . queryViewsByTarget [ queryView . targetId ] ;
705
+ delete this . queriesByTarget [ targetId ] ;
689
706
690
707
if ( this . isPrimary ) {
691
- const limboKeys = this . limboDocumentRefs . referencesForId (
692
- queryView . targetId
693
- ) ;
694
- this . limboDocumentRefs . removeReferencesForId ( queryView . targetId ) ;
708
+ const limboKeys = this . limboDocumentRefs . referencesForId ( targetId ) ;
709
+ this . limboDocumentRefs . removeReferencesForId ( targetId ) ;
695
710
limboKeys . forEach ( limboKey => {
696
711
const isReferenced = this . limboDocumentRefs . containsKey ( limboKey ) ;
697
712
if ( ! isReferenced ) {
@@ -710,6 +725,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
710
725
// This target already got removed, because the query failed.
711
726
return ;
712
727
}
728
+
713
729
this . remoteStore . unlisten ( limboTargetId ) ;
714
730
this . limboTargetsByKey = this . limboTargetsByKey . remove ( key ) ;
715
731
delete this . limboResolutionsByTarget [ limboTargetId ] ;
@@ -885,13 +901,19 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
885
901
const activeTargets : TargetId [ ] = [ ] ;
886
902
887
903
let p = Promise . resolve ( ) ;
888
- objUtils . forEachNumber ( this . queryViewsByTarget , ( targetId , queryView ) => {
904
+ objUtils . forEachNumber ( this . queriesByTarget , ( targetId , _ ) => {
889
905
if ( this . sharedClientState . isLocalQueryTarget ( targetId ) ) {
890
906
activeTargets . push ( targetId ) ;
891
907
} else {
892
- p = p . then ( ( ) => this . unlisten ( queryView . query ) ) ;
908
+ p = p . then ( ( ) => {
909
+ this . removeAndCleanupTarget ( targetId ) ;
910
+ return this . localStore . releaseTarget (
911
+ targetId ,
912
+ /*keepPersistedQueryData=*/ true
913
+ ) ;
914
+ } ) ;
893
915
}
894
- this . remoteStore . unlisten ( queryView . targetId ) ;
916
+ this . remoteStore . unlisten ( targetId ) ;
895
917
} ) ;
896
918
await p ;
897
919
@@ -926,27 +948,34 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
926
948
const newViewSnapshots : ViewSnapshot [ ] = [ ] ;
927
949
for ( const targetId of targets ) {
928
950
let queryData : QueryData ;
929
- const queryView = this . queryViewsByTarget [ targetId ] ;
930
- if ( queryView ) {
951
+ const queries = this . queriesByTarget [ targetId ] ;
952
+
953
+ if ( queries && queries . length !== 0 ) {
931
954
// For queries that have a local View, we need to update their state
932
955
// in LocalStore (as the resume token and the snapshot version
933
956
// might have changed) and reconcile their views with the persisted
934
957
// state (the list of syncedDocuments may have gotten out of sync).
935
- await this . localStore . releaseQuery (
936
- queryView . query ,
958
+ await this . localStore . releaseTarget (
959
+ targetId ,
937
960
/*keepPersistedQueryData=*/ true
938
961
) ;
939
- queryData = await this . localStore . allocateQuery ( queryView . query ) ;
940
- const viewChange = await this . synchronizeViewAndComputeSnapshot (
941
- queryView
942
- ) ;
943
- if ( viewChange . snapshot ) {
944
- newViewSnapshots . push ( viewChange . snapshot ) ;
962
+ queryData = await this . localStore . allocateTarget ( queries [ 0 ] . toTarget ( ) ) ;
963
+
964
+ for ( const query of queries ) {
965
+ const queryView = this . queryViewsByQuery . get ( query ) ;
966
+ assert ( ! ! queryView , `No query view found for ${ query } ` ) ;
967
+
968
+ const viewChange = await this . synchronizeViewAndComputeSnapshot (
969
+ queryView !
970
+ ) ;
971
+ if ( viewChange . snapshot ) {
972
+ newViewSnapshots . push ( viewChange . snapshot ) ;
973
+ }
945
974
}
946
975
} else {
947
976
assert (
948
977
this . isPrimary === true ,
949
- 'A secondary tab should never have an active query without an active view .'
978
+ 'A secondary tab should never have an active target without an active query .'
950
979
) ;
951
980
// For queries that never executed on this client, we need to
952
981
// allocate the target in LocalStore and initialize a new View.
@@ -959,8 +988,10 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
959
988
/*current=*/ false
960
989
) ;
961
990
}
962
- activeQueries . push ( queryData ) ;
991
+
992
+ activeQueries . push ( queryData ! ) ;
963
993
}
994
+
964
995
this . syncEngineListener ! . onWatchChange ( newViewSnapshots ) ;
965
996
return activeQueries ;
966
997
}
@@ -983,7 +1014,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
983
1014
return ;
984
1015
}
985
1016
986
- if ( this . queryViewsByTarget [ targetId ] ) {
1017
+ if ( this . queriesByTarget [ targetId ] ) {
987
1018
switch ( state ) {
988
1019
case 'current' :
989
1020
case 'not-current' : {
@@ -999,13 +1030,11 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
999
1030
break ;
1000
1031
}
1001
1032
case 'rejected' : {
1002
- const queryView = this . queryViewsByTarget [ targetId ] ;
1003
- this . removeAndCleanupQuery ( queryView ) ;
1004
- await this . localStore . releaseQuery (
1005
- queryView . query ,
1006
- /*keepPersistedQueryData=*/ true
1033
+ await this . localStore . releaseTarget (
1034
+ targetId ,
1035
+ /* keepPersistedQueryData */ true
1007
1036
) ;
1008
- this . syncEngineListener ! . onWatchError ( queryView . query , error ! ) ;
1037
+ this . removeAndCleanupTarget ( targetId , error ) ;
1009
1038
break ;
1010
1039
}
1011
1040
default :
@@ -1025,7 +1054,7 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
1025
1054
1026
1055
for ( const targetId of added ) {
1027
1056
assert (
1028
- ! this . queryViewsByTarget [ targetId ] ,
1057
+ ! this . queriesByTarget [ targetId ] ,
1029
1058
'Trying to add an already active target'
1030
1059
) ;
1031
1060
const target = await this . localStore . getTarget ( targetId ) ;
@@ -1040,18 +1069,20 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
1040
1069
}
1041
1070
1042
1071
for ( const targetId of removed ) {
1043
- const queryView = this . queryViewsByTarget [ targetId ] ;
1044
- // Check that the query is still active since the query might have been
1072
+ // Check that the target is still active since the target might have been
1045
1073
// removed if it has been rejected by the backend.
1046
- if ( queryView ) {
1047
- await this . localStore
1048
- . releaseQuery ( queryView . query , /*keepPersistedQueryData=*/ false )
1049
- . then ( ( ) => {
1050
- this . remoteStore . unlisten ( targetId ) ;
1051
- this . removeAndCleanupQuery ( queryView ) ;
1052
- } )
1053
- . catch ( ignoreIfPrimaryLeaseLoss ) ;
1074
+ if ( ! this . queriesByTarget [ targetId ] ) {
1075
+ continue ;
1054
1076
}
1077
+
1078
+ // Release queries that are still active.
1079
+ await this . localStore
1080
+ . releaseTarget ( targetId , /* keepPersistedQueryData */ false )
1081
+ . then ( ( ) => {
1082
+ this . remoteStore . unlisten ( targetId ) ;
1083
+ this . removeAndCleanupTarget ( targetId ) ;
1084
+ } )
1085
+ . catch ( ignoreIfPrimaryLeaseLoss ) ;
1055
1086
}
1056
1087
}
1057
1088
@@ -1074,9 +1105,17 @@ export class SyncEngine implements RemoteSyncer, SharedClientStateSyncer {
1074
1105
if ( limboResolution && limboResolution . receivedDocument ) {
1075
1106
return documentKeySet ( ) . add ( limboResolution . key ) ;
1076
1107
} else {
1077
- return this . queryViewsByTarget [ targetId ]
1078
- ? this . queryViewsByTarget [ targetId ] . view . syncedDocuments
1079
- : documentKeySet ( ) ;
1108
+ let keySet = documentKeySet ( ) ;
1109
+ const queries = this . queriesByTarget [ targetId ] ;
1110
+ if ( ! queries ) {
1111
+ return keySet ;
1112
+ }
1113
+ for ( const query of queries ) {
1114
+ const queryView = this . queryViewsByQuery . get ( query ) ;
1115
+ assert ( ! ! queryView , `No query view found for ${ query } ` ) ;
1116
+ keySet = keySet . unionWith ( queryView ! . view . syncedDocuments ) ;
1117
+ }
1118
+ return keySet ;
1080
1119
}
1081
1120
}
1082
1121
}
0 commit comments