30
30
import com .google .protobuf .InvalidProtocolBufferException ;
31
31
import com .google .protobuf .MessageLite ;
32
32
import java .util .ArrayList ;
33
+ import java .util .Collections ;
33
34
import java .util .HashMap ;
34
35
import java .util .List ;
35
36
import java .util .Map ;
@@ -57,15 +58,16 @@ public void add(MutableDocument document, SnapshotVersion readTime) {
57
58
!readTime .equals (SnapshotVersion .NONE ),
58
59
"Cannot add document to the RemoteDocumentCache with a read time of zero" );
59
60
60
- String path = pathForKey ( document .getKey () );
61
+ DocumentKey documentKey = document .getKey ();
61
62
Timestamp timestamp = readTime .getTimestamp ();
62
63
MessageLite message = serializer .encodeMaybeDocument (document );
63
64
64
65
db .execute (
65
66
"INSERT OR REPLACE INTO remote_documents "
66
- + "(path, read_time_seconds, read_time_nanos, contents) "
67
- + "VALUES (?, ?, ?, ?)" ,
68
- path ,
67
+ + "(collection_path, document_id, read_time_seconds, read_time_nanos, contents) "
68
+ + "VALUES (?, ?, ?, ?, ?)" ,
69
+ collectionForKey (documentKey ),
70
+ documentKey .getPath ().getLastSegment (),
69
71
timestamp .getSeconds (),
70
72
timestamp .getNanoseconds (),
71
73
message .toByteArray ());
@@ -75,56 +77,63 @@ public void add(MutableDocument document, SnapshotVersion readTime) {
75
77
76
78
@ Override
77
79
public void remove (DocumentKey documentKey ) {
78
- String path = pathForKey (documentKey );
79
-
80
- db .execute ("DELETE FROM remote_documents WHERE path = ?" , path );
80
+ db .execute (
81
+ "DELETE FROM remote_documents WHERE collection_path = ? AND document_id = ?" ,
82
+ collectionForKey (documentKey ),
83
+ documentKey .getPath ().getLastSegment ());
81
84
}
82
85
83
86
@ Override
84
87
public MutableDocument get (DocumentKey documentKey ) {
85
- String path = pathForKey (documentKey );
86
-
87
88
MutableDocument document =
88
89
db .query (
89
- "SELECT contents, read_time_seconds, read_time_nanos "
90
- + "FROM remote_documents "
91
- + "WHERE path = ?" )
92
- .binding (path )
90
+ "SELECT contents, read_time_seconds, read_time_nanos FROM remote_documents "
91
+ + "WHERE collection_path = ? AND document_id = ?" )
92
+ .binding (collectionForKey (documentKey ), documentKey .getPath ().getLastSegment ())
93
93
.firstValue (row -> decodeMaybeDocument (row .getBlob (0 ), row .getInt (1 ), row .getInt (2 )));
94
94
return document != null ? document : MutableDocument .newInvalidDocument (documentKey );
95
95
}
96
96
97
97
@ Override
98
98
public Map <DocumentKey , MutableDocument > getAll (Iterable <DocumentKey > documentKeys ) {
99
- List <Object > args = new ArrayList <>();
100
- for (DocumentKey key : documentKeys ) {
101
- args .add (EncodedPath .encode (key .getPath ()));
102
- }
103
-
104
99
Map <DocumentKey , MutableDocument > results = new HashMap <>();
100
+
101
+ // We issue one query by collection, so first we have to sort the keys into collection buckets.
102
+ Map <ResourcePath , List <Object >> collectionToDocumentIds = new HashMap <>();
105
103
for (DocumentKey key : documentKeys ) {
104
+ ResourcePath path = key .getPath ();
105
+ List <Object > documentIds = collectionToDocumentIds .get (path .popLast ());
106
+ if (documentIds == null ) {
107
+ documentIds = new ArrayList <>();
108
+ collectionToDocumentIds .put (path .popLast (), documentIds );
109
+ }
110
+ documentIds .add (path .getLastSegment ());
111
+
106
112
// Make sure each key has a corresponding entry, which is null in case the document is not
107
113
// found.
108
114
results .put (key , MutableDocument .newInvalidDocument (key ));
109
115
}
110
116
111
- SQLitePersistence .LongQuery longQuery =
112
- new SQLitePersistence .LongQuery (
113
- db ,
114
- "SELECT contents, read_time_seconds, read_time_nanos FROM remote_documents "
115
- + "WHERE path IN (" ,
116
- args ,
117
- ") ORDER BY path" );
118
-
119
- while (longQuery .hasMoreSubqueries ()) {
120
- longQuery
121
- .performNextSubquery ()
122
- .forEach (
123
- row -> {
124
- MutableDocument decoded =
125
- decodeMaybeDocument (row .getBlob (0 ), row .getInt (1 ), row .getInt (2 ));
126
- results .put (decoded .getKey (), decoded );
127
- });
117
+ for (Map .Entry <ResourcePath , List <Object >> entry : collectionToDocumentIds .entrySet ()) {
118
+ SQLitePersistence .LongQuery longQuery =
119
+ new SQLitePersistence .LongQuery (
120
+ db ,
121
+ "SELECT contents, read_time_seconds, read_time_nanos FROM remote_documents "
122
+ + "WHERE collection_path = ? AND document_id IN (" ,
123
+ Collections .singletonList (EncodedPath .encode (entry .getKey ())),
124
+ entry .getValue (),
125
+ ")" );
126
+
127
+ while (longQuery .hasMoreSubqueries ()) {
128
+ longQuery
129
+ .performNextSubquery ()
130
+ .forEach (
131
+ row -> {
132
+ MutableDocument decoded =
133
+ decodeMaybeDocument (row .getBlob (0 ), row .getInt (1 ), row .getInt (2 ));
134
+ results .put (decoded .getKey (), decoded );
135
+ });
136
+ }
128
137
}
129
138
130
139
return results ;
@@ -137,12 +146,7 @@ public ImmutableSortedMap<DocumentKey, MutableDocument> getAllDocumentsMatchingQ
137
146
!query .isCollectionGroupQuery (),
138
147
"CollectionGroup queries should be handled in LocalDocumentsView" );
139
148
140
- // Use the query path as a prefix for testing if a document matches the query.
141
- ResourcePath prefix = query .getPath ();
142
- int immediateChildrenPathLength = prefix .length () + 1 ;
143
-
144
- String prefixPath = EncodedPath .encode (prefix );
145
- String prefixSuccessorPath = EncodedPath .prefixSuccessor (prefixPath );
149
+ String collectionPath = EncodedPath .encode (query .getPath ());
146
150
Timestamp readTime = sinceReadTime .getTimestamp ();
147
151
148
152
BackgroundQueue backgroundQueue = new BackgroundQueue ();
@@ -155,43 +159,30 @@ public ImmutableSortedMap<DocumentKey, MutableDocument> getAllDocumentsMatchingQ
155
159
if (sinceReadTime .equals (SnapshotVersion .NONE )) {
156
160
sqlQuery =
157
161
db .query (
158
- "SELECT path, contents, read_time_seconds, read_time_nanos "
159
- + "FROM remote_documents WHERE path >= ? AND path < ?" )
160
- .binding (prefixPath , prefixSuccessorPath );
162
+ "SELECT contents, read_time_seconds, read_time_nanos "
163
+ + "FROM remote_documents WHERE collection_path = ?" )
164
+ .binding (collectionPath );
161
165
} else {
162
166
// Execute an index-free query and filter by read time. This is safe since all document
163
167
// changes to queries that have a lastLimboFreeSnapshotVersion (`sinceReadTime`) have a read
164
168
// time set.
165
169
sqlQuery =
166
170
db .query (
167
- "SELECT path, contents, read_time_seconds, read_time_nanos "
168
- + "FROM remote_documents WHERE path > = ? AND path < ? "
171
+ "SELECT contents, read_time_seconds, read_time_nanos "
172
+ + "FROM remote_documents WHERE collection_path = ? "
169
173
+ "AND (read_time_seconds > ? OR (read_time_seconds = ? AND read_time_nanos > ?))" )
170
174
.binding (
171
- prefixPath ,
172
- prefixSuccessorPath ,
175
+ collectionPath ,
173
176
readTime .getSeconds (),
174
177
readTime .getSeconds (),
175
178
readTime .getNanoseconds ());
176
179
}
177
180
sqlQuery .forEach (
178
181
row -> {
179
- // TODO: Actually implement a single-collection query
180
- //
181
- // The query is actually returning any path that starts with the query path prefix
182
- // which may include documents in subcollections. For example, a query on 'rooms'
183
- // will return rooms/abc/messages/xyx but we shouldn't match it. Fix this by
184
- // discarding rows with document keys more than one segment longer than the query
185
- // path.
186
- ResourcePath path = EncodedPath .decodeResourcePath (row .getString (0 ));
187
- if (path .length () != immediateChildrenPathLength ) {
188
- return ;
189
- }
190
-
191
182
// Store row values in array entries to provide the correct context inside the executor.
192
- final byte [] rawDocument = row .getBlob (1 );
193
- final int [] readTimeSeconds = {row .getInt (2 )};
194
- final int [] readTimeNanos = {row .getInt (3 )};
183
+ final byte [] rawDocument = row .getBlob (0 );
184
+ final int [] readTimeSeconds = {row .getInt (1 )};
185
+ final int [] readTimeNanos = {row .getInt (2 )};
195
186
196
187
// Since scheduling background tasks incurs overhead, we only dispatch to a
197
188
// background thread if there are still some documents remaining.
@@ -228,8 +219,8 @@ public SnapshotVersion getLatestReadTime() {
228
219
return latestReadTime != null ? latestReadTime : SnapshotVersion .NONE ;
229
220
}
230
221
231
- private String pathForKey (DocumentKey key ) {
232
- return EncodedPath .encode (key .getPath ());
222
+ private String collectionForKey (DocumentKey key ) {
223
+ return EncodedPath .encode (key .getPath (). popLast () );
233
224
}
234
225
235
226
private MutableDocument decodeMaybeDocument (
0 commit comments