Skip to content

perf: minor optimizations to the standard query path #3101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.38.0')
implementation platform('com.google.cloud:libraries-bom:26.39.0')

implementation 'com.google.cloud:google-cloud-spanner'
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import com.google.spanner.v1.DirectedReadOptions;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.ExecuteSqlRequest.Builder;
import com.google.spanner.v1.ExecuteSqlRequest.QueryMode;
import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions;
import com.google.spanner.v1.PartialResultSet;
Expand Down Expand Up @@ -457,7 +458,7 @@ void initTransaction() {

// A per-transaction sequence number used to identify this ExecuteSqlRequests. Required for DML,
// ignored for query by the server.
private AtomicLong seqNo = new AtomicLong();
private final AtomicLong seqNo = new AtomicLong();

// Allow up to 512MB to be buffered (assuming 1MB chunks). In practice, restart tokens are sent
// much more frequently.
Expand Down Expand Up @@ -488,6 +489,10 @@ long getSeqNo() {
return seqNo.incrementAndGet();
}

protected boolean isReadOnly() {
return true;
}

protected boolean isRouteToLeader() {
return false;
}
Expand Down Expand Up @@ -622,19 +627,18 @@ private ResultSet executeQueryInternal(
@VisibleForTesting
QueryOptions buildQueryOptions(QueryOptions requestOptions) {
// Shortcut for the most common return value.
if (defaultQueryOptions.equals(QueryOptions.getDefaultInstance()) && requestOptions == null) {
return QueryOptions.getDefaultInstance();
if (requestOptions == null) {
return defaultQueryOptions;
}
// Create a builder based on the default query options.
QueryOptions.Builder builder = defaultQueryOptions.toBuilder();
// Then overwrite with specific options for this query.
if (requestOptions != null) {
builder.mergeFrom(requestOptions);
}
return builder.build();
return defaultQueryOptions.toBuilder().mergeFrom(requestOptions).build();
}

RequestOptions buildRequestOptions(Options options) {
// Shortcut for the most common return value.
if (!(options.hasPriority() || options.hasTag() || getTransactionTag() != null)) {
return RequestOptions.getDefaultInstance();
}

RequestOptions.Builder builder = RequestOptions.newBuilder();
if (options.hasPriority()) {
builder.setPriority(options.priority());
Expand All @@ -655,16 +659,7 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
.setSql(statement.getSql())
.setQueryMode(queryMode)
.setSession(session.getName());
Map<String, Value> stmtParameters = statement.getParameters();
if (!stmtParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
for (Map.Entry<String, Value> param : stmtParameters.entrySet()) {
paramsBuilder.putFields(param.getKey(), Value.toProto(param.getValue()));
if (param.getValue() != null && param.getValue().getType() != null) {
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
}
addParameters(builder, statement.getParameters());
if (withTransactionSelector) {
TransactionSelector selector = getTransactionSelector();
if (selector != null) {
Expand All @@ -679,12 +674,26 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
} else if (defaultDirectedReadOptions != null) {
builder.setDirectedReadOptions(defaultDirectedReadOptions);
}
builder.setSeqno(getSeqNo());
if (!isReadOnly()) {
builder.setSeqno(getSeqNo());
}
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
builder.setRequestOptions(buildRequestOptions(options));
return builder;
}

static void addParameters(ExecuteSqlRequest.Builder builder, Map<String, Value> stmtParameters) {
if (!stmtParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder = builder.getParamsBuilder();
for (Map.Entry<String, Value> param : stmtParameters.entrySet()) {
paramsBuilder.putFields(param.getKey(), Value.toProto(param.getValue()));
if (param.getValue() != null && param.getValue().getType() != null) {
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
}
}

ExecuteBatchDmlRequest.Builder getExecuteBatchDmlRequestBuilder(
Iterable<Statement> statements, Options options) {
ExecuteBatchDmlRequest.Builder builder =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.spanner;

import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.spanner.SpannerOptions.FixedCloseableExecutorProvider;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import org.threeten.bp.Duration;

public class LatencyTest {

public static void main(String[] args) throws Exception {
ThreadFactory threadFactory =
ThreadFactoryUtil.tryCreateVirtualThreadFactory("spanner-async-worker");
if (threadFactory == null) {
return;
}
ScheduledExecutorService service = Executors.newScheduledThreadPool(0, threadFactory);
Spanner spanner =
SpannerOptions.newBuilder()
.setCredentials(
GoogleCredentials.fromStream(
Files.newInputStream(
Paths.get("/Users/loite/Downloads/appdev-soda-spanner-staging.json"))))
.setSessionPoolOption(
SessionPoolOptions.newBuilder()
.setWaitForMinSessions(Duration.ofSeconds(5L))
// .setUseMultiplexedSession(true)
.build())
.setUseVirtualThreads(true)
.setAsyncExecutorProvider(FixedCloseableExecutorProvider.create(service))
.build()
.getService();
DatabaseClient client =
spanner.getDatabaseClient(
DatabaseId.of("appdev-soda-spanner-staging", "knut-test-ycsb", "latencytest"));
for (int i = 0; i < 1000000; i++) {
try (AsyncResultSet resultSet =
client
.singleUse()
.executeQueryAsync(
Statement.newBuilder("select col_varchar from latency_test where col_bigint=$1")
.bind("p1")
.to(ThreadLocalRandom.current().nextLong(100000L))
.build())) {
while (resultSet.next()) {
for (int col = 0; col < resultSet.getColumnCount(); col++) {
if (resultSet.getValue(col) == null) {
throw new IllegalStateException();
}
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,6 @@ private ByteString initTransaction(final Options options) {
private void setParameters(
final ExecuteSqlRequest.Builder requestBuilder,
final Map<String, Value> statementParameters) {
if (!statementParameters.isEmpty()) {
com.google.protobuf.Struct.Builder paramsBuilder = requestBuilder.getParamsBuilder();
for (Map.Entry<String, Value> param : statementParameters.entrySet()) {
paramsBuilder.putFields(param.getKey(), Value.toProto(param.getValue()));
if (param.getValue() != null && param.getValue().getType() != null) {
requestBuilder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
}
AbstractReadContext.addParameters(requestBuilder, statementParameters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ abstract class ResumableStreamIterator extends AbstractIterator<PartialResultSet
private final RetrySettings streamingRetrySettings;
private final Set<Code> retryableCodes;
private static final Logger logger = Logger.getLogger(ResumableStreamIterator.class.getName());
private final BackOff backOff;
private BackOff backOff;
private final LinkedList<PartialResultSet> buffer = new LinkedList<>();
private final int maxBufferSize;
private final ISpan span;
Expand Down Expand Up @@ -87,7 +87,6 @@ protected ResumableStreamIterator(
this.span = tracer.spanBuilderWithExplicitParent(streamName, parent);
this.streamingRetrySettings = Preconditions.checkNotNull(streamingRetrySettings);
this.retryableCodes = Preconditions.checkNotNull(retryableCodes);
this.backOff = newBackOff();
}

private ExponentialBackOff newBackOff() {
Expand Down Expand Up @@ -252,7 +251,10 @@ protected PartialResultSet computeNext() {
if (delay != -1) {
backoffSleep(context, delay);
} else {
backoffSleep(context, backOff);
if (this.backOff == null) {
this.backOff = newBackOff();
}
backoffSleep(context, this.backOff);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ interface SessionTransaction {
void close();
}

private static final Map<SpannerRpc.Option, ?>[] CHANNEL_HINT_OPTIONS =
new Map[SpannerOptions.MAX_CHANNELS];

static {
for (int i = 0; i < CHANNEL_HINT_OPTIONS.length; i++) {
CHANNEL_HINT_OPTIONS[i] = optionMap(SessionOption.channelHint(i));
}
}

static final int NO_CHANNEL_HINT = -1;

private final SpannerImpl spanner;
Expand All @@ -125,7 +134,7 @@ interface SessionTransaction {
if (channelHint == NO_CHANNEL_HINT) {
return sessionReference.getOptions();
}
return optionMap(SessionOption.channelHint(channelHint));
return CHANNEL_HINT_OPTIONS[channelHint % CHANNEL_HINT_OPTIONS.length];
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,7 @@ Builder setPoolMaintainerClock(Clock poolMaintainerClock) {
* SessionPoolOptions#maxSessions} based on the traffic load. Failing to do so will result in
* higher latencies.
*/
Builder setUseMultiplexedSession(boolean useMultiplexedSession) {
public Builder setUseMultiplexedSession(boolean useMultiplexedSession) {
this.useMultiplexedSession = useMultiplexedSession;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
ImmutableSet.of(
"https://www.googleapis.com/auth/spanner.admin",
"https://www.googleapis.com/auth/spanner.data");
private static final int MAX_CHANNELS = 256;
static final int MAX_CHANNELS = 256;
@VisibleForTesting static final int DEFAULT_CHANNELS = 4;
// Set the default number of channels to GRPC_GCP_ENABLED_DEFAULT_CHANNELS when gRPC-GCP extension
// is enabled, to make sure there are sufficient channels available to move the sessions to a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ private TransactionContextImpl(Builder builder) {
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
}

@Override
protected boolean isReadOnly() {
return false;
}

@Override
protected boolean isRouteToLeader() {
return true;
Expand Down
Loading