Skip to content

Add QueryData.with() helpers #687

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,9 @@ public ImmutableSortedMap<DocumentKey, MaybeDocument> applyRemoteEvent(RemoteEve
if (!resumeToken.isEmpty()) {
QueryData oldQueryData = queryData;
queryData =
queryData.copy(remoteEvent.getSnapshotVersion(), resumeToken, sequenceNumber);
queryData
.withResumeToken(resumeToken, remoteEvent.getSnapshotVersion())
.withSequenceNumber(sequenceNumber);
targetIds.put(boxedTargetId, queryData);

if (shouldPersistQueryData(oldQueryData, queryData, change)) {
Expand Down Expand Up @@ -488,14 +490,7 @@ public void notifyLocalViewChanges(List<LocalViewChanges> viewChanges) {
// Advance the last limbo free snapshot version
SnapshotVersion lastLimboFreeSnapshotVersion = queryData.getSnapshotVersion();
QueryData updatedQueryData =
new QueryData(
queryData.getQuery(),
queryData.getTargetId(),
queryData.getSequenceNumber(),
queryData.getPurpose(),
queryData.getSnapshotVersion(),
lastLimboFreeSnapshotVersion,
queryData.getResumeToken());
queryData.withLastLimboFreeSnapshotVersion(lastLimboFreeSnapshotVersion);
targetIds.put(targetId, updatedQueryData);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ public void removeMutationReference(DocumentKey key) {

@Override
public void removeTarget(QueryData queryData) {
QueryData updated =
queryData.copy(
queryData.getSnapshotVersion(), queryData.getResumeToken(), getCurrentSequenceNumber());
QueryData updated = queryData.withSequenceNumber(getCurrentSequenceNumber());
persistence.getQueryCache().updateQueryData(updated);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public final class QueryData {
* after disconnecting without retransmitting all the data that matches the query. The resume
* token essentially identifies a point in time from which the server should resume sending
*/
public QueryData(
QueryData(
Query query,
int targetId,
long sequenceNumber,
Expand Down Expand Up @@ -75,6 +75,42 @@ public QueryData(Query query, int targetId, long sequenceNumber, QueryPurpose pu
WatchStream.EMPTY_RESUME_TOKEN);
}

/** Creates a new query data instance with an updated sequence number. */
public QueryData withSequenceNumber(long sequenceNumber) {
return new QueryData(
query,
targetId,
sequenceNumber,
purpose,
snapshotVersion,
lastLimboFreeSnapshotVersion,
resumeToken);
}

/** Creates a new query data instance with an updated resume token and snapshot version. */
public QueryData withResumeToken(ByteString resumeToken, SnapshotVersion snapshotVersion) {
return new QueryData(
query,
targetId,
sequenceNumber,
purpose,
snapshotVersion,
lastLimboFreeSnapshotVersion,
resumeToken);
}

/** Creates a new query data instance with an updated last limbo free snapshot version number. */
public QueryData withLastLimboFreeSnapshotVersion(SnapshotVersion lastLimboFreeSnapshotVersion) {
return new QueryData(
query,
targetId,
sequenceNumber,
purpose,
snapshotVersion,
lastLimboFreeSnapshotVersion,
resumeToken);
}

public Query getQuery() {
return query;
}
Expand Down Expand Up @@ -156,17 +192,4 @@ public String toString() {
+ resumeToken
+ '}';
}

/** Creates a new query data instance with an updated snapshot version and resume token. */
public QueryData copy(
SnapshotVersion snapshotVersion, ByteString resumeToken, long sequenceNumber) {
return new QueryData(
query,
targetId,
sequenceNumber,
purpose,
snapshotVersion,
lastLimboFreeSnapshotVersion,
resumeToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,7 @@ public int removeOrphanedDocuments(long upperBound) {

@Override
public void removeTarget(QueryData queryData) {
QueryData updated =
queryData.copy(
queryData.getSnapshotVersion(), queryData.getResumeToken(), getCurrentSequenceNumber());
QueryData updated = queryData.withSequenceNumber(getCurrentSequenceNumber());
persistence.getQueryCache().updateQueryData(updated);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,9 +504,7 @@ private void raiseWatchSnapshot(SnapshotVersion snapshotVersion) {
// A watched target might have been removed already.
if (queryData != null) {
this.listenTargets.put(
targetId,
queryData.copy(
snapshotVersion, targetChange.getResumeToken(), queryData.getSequenceNumber()));
targetId, queryData.withResumeToken(targetChange.getResumeToken(), snapshotVersion));
}
}
}
Expand All @@ -519,9 +517,7 @@ private void raiseWatchSnapshot(SnapshotVersion snapshotVersion) {
if (queryData != null) {
// Clear the resume token for the query, since we're in a known mismatch state.
this.listenTargets.put(
targetId,
queryData.copy(
queryData.getSnapshotVersion(), ByteString.EMPTY, queryData.getSequenceNumber()));
targetId, queryData.withResumeToken(ByteString.EMPTY, queryData.getSnapshotVersion()));

// Cause a hard reset by unwatching and rewatching immediately, but deliberately don't send
// a resume token so that we get a full update.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ private void updateTargetInTransaction(QueryData queryData) {
SnapshotVersion version = version(2);
ByteString resumeToken = resumeToken(2);
QueryData updated =
queryData.copy(
version, resumeToken, persistence.getReferenceDelegate().getCurrentSequenceNumber());
queryData
.withResumeToken(resumeToken, version)
.withSequenceNumber(persistence.getReferenceDelegate().getCurrentSequenceNumber());
queryCache.updateQueryData(updated);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,7 @@ public void watchQuery(QueryData queryData) {
+ ")");
// Snapshot version is ignored on the wire
QueryData sentQueryData =
queryData.copy(
SnapshotVersion.NONE, queryData.getResumeToken(), queryData.getSequenceNumber());
queryData.withResumeToken(queryData.getResumeToken(), SnapshotVersion.NONE);
watchStreamRequestCount += 1;
this.activeTargets.put(queryData.getTargetId(), sentQueryData);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,16 +468,7 @@ public void testEncodesListenRequestLabels() {
@Test
public void testEncodesFirstLevelKeyQueries() {
Query q = Query.atPath(ResourcePath.fromString("docs/1"));
Target actual =
serializer.encodeTarget(
new QueryData(
q,
1,
2,
QueryPurpose.LISTEN,
SnapshotVersion.NONE,
SnapshotVersion.NONE,
WatchStream.EMPTY_RESUME_TOKEN));
Target actual = serializer.encodeTarget(new QueryData(q, 1, 2, QueryPurpose.LISTEN));

DocumentsTarget.Builder docs =
DocumentsTarget.newBuilder().addDocuments("projects/p/databases/d/documents/docs/1");
Expand Down Expand Up @@ -881,16 +872,10 @@ public void testEncodesBounds() {
@Test
public void testEncodesResumeTokens() {
Query q = Query.atPath(ResourcePath.fromString("docs"));
Target actual =
serializer.encodeTarget(
new QueryData(
q,
1,
2,
QueryPurpose.LISTEN,
SnapshotVersion.NONE,
SnapshotVersion.NONE,
TestUtil.resumeToken(1000)));
QueryData queryData =
new QueryData(q, 1, 2, QueryPurpose.LISTEN)
.withResumeToken(TestUtil.resumeToken(1000), SnapshotVersion.NONE);
Target actual = serializer.encodeTarget(queryData);

StructuredQuery.Builder structuredQueryBuilder =
StructuredQuery.newBuilder()
Expand All @@ -917,14 +902,7 @@ public void testEncodesResumeTokens() {
* QueryData, but for the most part we're just testing variations on Query.
*/
private QueryData wrapQueryData(Query query) {
return new QueryData(
query,
1,
2,
QueryPurpose.LISTEN,
SnapshotVersion.NONE,
SnapshotVersion.NONE,
WatchStream.EMPTY_RESUME_TOKEN);
return new QueryData(query, 1, 2, QueryPurpose.LISTEN);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -879,16 +879,10 @@ private void validateStateExpectations(@Nullable JSONObject expected) throws JSO
// TODO: populate the purpose of the target once it's possible to encode that in the
// spec tests. For now, hard-code that it's a listen despite the fact that it's not always
// the right value.
expectedActiveTargets.put(
targetId,
new QueryData(
query,
targetId,
ARBITRARY_SEQUENCE_NUMBER,
QueryPurpose.LISTEN,
SnapshotVersion.NONE,
SnapshotVersion.NONE,
ByteString.copyFromUtf8(resumeToken)));
QueryData queryData =
new QueryData(query, targetId, ARBITRARY_SEQUENCE_NUMBER, QueryPurpose.LISTEN)
.withResumeToken(ByteString.copyFromUtf8(resumeToken), SnapshotVersion.NONE);
expectedActiveTargets.put(targetId, queryData);
}
}
}
Expand Down