Skip to content

chore: improve test execution speed #3245

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3859,15 +3859,16 @@ public void testBatchCreateSessionsFailure_shouldNotPropagateToCloseMethod() {
try {
// Simulate session creation failures on the backend.
mockSpanner.setBatchCreateSessionsExecutionTime(
SimulatedExecutionTime.ofStickyException(Status.RESOURCE_EXHAUSTED.asRuntimeException()));
SimulatedExecutionTime.ofStickyException(
Status.FAILED_PRECONDITION.asRuntimeException()));
DatabaseClient client =
spannerWithEmptySessionPool.getDatabaseClient(
DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE));
// This will not cause any failure as getting a session from the pool is guaranteed to be
// non-blocking, and any exceptions will be delayed until actual query execution.
try (ResultSet rs = client.singleUse().executeQuery(SELECT1)) {
SpannerException e = assertThrows(SpannerException.class, rs::next);
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.RESOURCE_EXHAUSTED);
assertThat(e.getErrorCode()).isEqualTo(ErrorCode.FAILED_PRECONDITION);
}
} finally {
mockSpanner.setBatchCreateSessionsExecutionTime(SimulatedExecutionTime.none());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.cloud.spanner.spi.v1.SpannerRpc.ResultStreamConsumer;
import com.google.cloud.spanner.v1.stub.SpannerStubSettings;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
Expand Down Expand Up @@ -2022,14 +2023,16 @@ public void testOpenCensusMetricsDisable() {
public void testOpenTelemetrySessionMetrics() throws Exception {
SpannerOptions.resetActiveTracingFramework();
SpannerOptions.enableOpenTelemetryMetrics();
// Create a session pool with max 2 session and a low timeout for waiting for a session.
// Create a session pool with max 3 session and a low timeout for waiting for a session.
if (minSessions == 1) {
options =
SessionPoolOptions.newBuilder()
.setMinSessions(1)
.setMaxSessions(3)
.setMaxIdleSessions(0)
.setInitialWaitForSessionTimeoutMillis(50L)
// This must be set to null for the setInitialWaitForSessionTimeoutMillis call to have
// any effect.
.setAcquireSessionTimeout(null)
.setInitialWaitForSessionTimeoutMillis(1L)
.build();
FakeClock clock = new FakeClock();
clock.currentTimeMillis.set(System.currentTimeMillis());
Expand Down Expand Up @@ -2080,26 +2083,29 @@ public void testOpenTelemetrySessionMetrics() throws Exception {
Future<Void> fut =
executor.submit(
() -> {
PooledSessionFuture session = pool.getSession();
latch.countDown();
Session session = pool.getSession();
session.get();
session.close();
return null;
});
// Wait until the background thread is actually waiting for a session.
latch.await();
// Wait until the request has timed out.
int waitCount = 0;
while (pool.getNumWaiterTimeouts() == 0L && waitCount < 1000) {
Thread.sleep(5L);
waitCount++;
Stopwatch watch = Stopwatch.createStarted();
while (pool.getNumWaiterTimeouts() == 0L && watch.elapsed(TimeUnit.MILLISECONDS) < 100) {
Thread.yield();
}
assertTrue(pool.getNumWaiterTimeouts() > 0);
// Return the checked out session to the pool so the async request will get a session and
// finish.
session2.close();
// Verify that the async request also succeeds.
fut.get(10L, TimeUnit.SECONDS);
executor.shutdown();
assertTrue(executor.awaitTermination(10L, TimeUnit.SECONDS));

inMemoryMetricReader.forceFlush();
metricDataCollection = inMemoryMetricReader.collectAllMetrics();

// Max Allowed sessions should be 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner.spi.v1;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import com.google.auth.oauth2.AccessToken;
import com.google.auth.oauth2.OAuth2Credentials;
Expand All @@ -27,6 +28,8 @@
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.protobuf.ListValue;
import com.google.spanner.v1.ResultSetMetadata;
import com.google.spanner.v1.StructType;
Expand All @@ -47,7 +50,6 @@
import io.opencensus.tags.TagValue;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
Expand Down Expand Up @@ -80,26 +82,22 @@ public class GfeLatencyTest {

private static MockSpannerServiceImpl mockSpanner;
private static Server server;
private static InetSocketAddress address;
private static Spanner spanner;
private static DatabaseClient databaseClient;

private static final Map<SpannerRpc.Option, Object> optionsMap = new HashMap<>();

private static MockSpannerServiceImpl mockSpannerNoHeader;
private static Server serverNoHeader;
private static InetSocketAddress addressNoHeader;
private static Spanner spannerNoHeader;
private static DatabaseClient databaseClientNoHeader;

private static String instanceId = "fake-instance";
private static String databaseId = "fake-database";
private static String projectId = "fake-project";
private static final String INSTANCE_ID = "fake-instance";
private static final String DATABASE_ID = "fake-database";
private static final String PROJECT_ID = "fake-project";

private static final long WAIT_FOR_METRICS_TIME_MS = 1_000;
private static final int MAXIMUM_RETRIES = 5;
private static final int MAXIMUM_RETRIES = 50000;

private static AtomicInteger fakeServerTiming = new AtomicInteger(new Random().nextInt(1000) + 1);
private static final AtomicInteger FAKE_SERVER_TIMING =
new AtomicInteger(new Random().nextInt(1000) + 1);

private static final Statement SELECT1AND2 =
Statement.of("SELECT 1 AS COL1 UNION ALL SELECT 2 AS COL1");
Expand Down Expand Up @@ -135,6 +133,7 @@ public class GfeLatencyTest {

@BeforeClass
public static void startServer() throws IOException {
//noinspection deprecation
SpannerRpcViews.registerGfeLatencyAndHeaderMissingCountViews();

mockSpanner = new MockSpannerServiceImpl();
Expand All @@ -143,7 +142,7 @@ public static void startServer() throws IOException {
MockSpannerServiceImpl.StatementResult.query(SELECT1AND2, SELECT1_RESULTSET));
mockSpanner.putStatementResult(
MockSpannerServiceImpl.StatementResult.update(UPDATE_FOO_STATEMENT, 1L));
address = new InetSocketAddress("localhost", 0);
InetSocketAddress address = new InetSocketAddress("localhost", 0);
server =
NettyServerBuilder.forAddress(address)
.addService(mockSpanner)
Expand All @@ -161,7 +160,7 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
public void sendHeaders(Metadata headers) {
headers.put(
Metadata.Key.of("server-timing", Metadata.ASCII_STRING_MARSHALLER),
String.format("gfet4t7; dur=%d", fakeServerTiming.get()));
String.format("gfet4t7; dur=%d", FAKE_SERVER_TIMING.get()));
super.sendHeaders(headers);
}
},
Expand All @@ -170,25 +169,24 @@ public void sendHeaders(Metadata headers) {
})
.build()
.start();
optionsMap.put(SpannerRpc.Option.CHANNEL_HINT, 1L);
spanner = createSpannerOptions(address, server).getService();
databaseClient = spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId));
databaseClient = spanner.getDatabaseClient(DatabaseId.of(PROJECT_ID, INSTANCE_ID, DATABASE_ID));

mockSpannerNoHeader = new MockSpannerServiceImpl();
mockSpannerNoHeader.setAbortProbability(0.0D);
mockSpannerNoHeader.putStatementResult(
MockSpannerServiceImpl.StatementResult.query(SELECT1AND2, SELECT1_RESULTSET));
mockSpannerNoHeader.putStatementResult(
MockSpannerServiceImpl.StatementResult.update(UPDATE_FOO_STATEMENT, 1L));
addressNoHeader = new InetSocketAddress("localhost", 0);
InetSocketAddress addressNoHeader = new InetSocketAddress("localhost", 0);
serverNoHeader =
NettyServerBuilder.forAddress(addressNoHeader)
.addService(mockSpannerNoHeader)
.build()
.start();
spannerNoHeader = createSpannerOptions(addressNoHeader, serverNoHeader).getService();
databaseClientNoHeader =
spannerNoHeader.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId));
spannerNoHeader.getDatabaseClient(DatabaseId.of(PROJECT_ID, INSTANCE_ID, DATABASE_ID));
}

@AfterClass
Expand Down Expand Up @@ -221,12 +219,9 @@ public void testGfeLatencyExecuteStreamingSql() throws InterruptedException {
long latency =
getMetric(
SpannerRpcViews.SPANNER_GFE_LATENCY_VIEW,
projectId,
instanceId,
databaseId,
"google.spanner.v1.Spanner/ExecuteStreamingSql",
false);
assertEquals(fakeServerTiming.get(), latency);
assertEquals(FAKE_SERVER_TIMING.get(), latency);
}

@Test
Expand All @@ -238,12 +233,9 @@ public void testGfeLatencyExecuteSql() throws InterruptedException {
long latency =
getMetric(
SpannerRpcViews.SPANNER_GFE_LATENCY_VIEW,
projectId,
instanceId,
databaseId,
"google.spanner.v1.Spanner/ExecuteSql",
false);
assertEquals(fakeServerTiming.get(), latency);
assertEquals(FAKE_SERVER_TIMING.get(), latency);
}

@Test
Expand All @@ -254,9 +246,6 @@ public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedExc
long count =
getMetric(
SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW,
projectId,
instanceId,
databaseId,
"google.spanner.v1.Spanner/ExecuteStreamingSql",
false);
assertEquals(0, count);
Expand All @@ -267,9 +256,6 @@ public void testGfeMissingHeaderCountExecuteStreamingSql() throws InterruptedExc
long count1 =
getMetric(
SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW,
projectId,
instanceId,
databaseId,
"google.spanner.v1.Spanner/ExecuteStreamingSql",
true);
assertEquals(1, count1);
Expand All @@ -283,9 +269,6 @@ public void testGfeMissingHeaderExecuteSql() throws InterruptedException {
long count =
getMetric(
SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW,
projectId,
instanceId,
databaseId,
"google.spanner.v1.Spanner/ExecuteSql",
false);
assertEquals(0, count);
Expand All @@ -296,9 +279,6 @@ public void testGfeMissingHeaderExecuteSql() throws InterruptedException {
long count1 =
getMetric(
SpannerRpcViews.SPANNER_GFE_HEADER_MISSING_COUNT_VIEW,
projectId,
instanceId,
databaseId,
"google.spanner.v1.Spanner/ExecuteSql",
true);
assertEquals(1, count1);
Expand All @@ -321,78 +301,75 @@ private static SpannerOptions createSpannerOptions(InetSocketAddress address, Se
}

private long getAggregationValueAsLong(AggregationData aggregationData) {
return aggregationData.match(
new io.opencensus.common.Function<AggregationData.SumDataDouble, Long>() {
@Override
public Long apply(AggregationData.SumDataDouble arg) {
return (long) arg.getSum();
}
},
new io.opencensus.common.Function<AggregationData.SumDataLong, Long>() {
@Override
public Long apply(AggregationData.SumDataLong arg) {
return arg.getSum();
}
},
new io.opencensus.common.Function<AggregationData.CountData, Long>() {
@Override
public Long apply(AggregationData.CountData arg) {
return arg.getCount();
}
},
new io.opencensus.common.Function<AggregationData.DistributionData, Long>() {
@Override
public Long apply(AggregationData.DistributionData arg) {
return (long) arg.getMean();
}
},
new io.opencensus.common.Function<AggregationData.LastValueDataDouble, Long>() {
@Override
public Long apply(AggregationData.LastValueDataDouble arg) {
return (long) arg.getLastValue();
}
},
new io.opencensus.common.Function<AggregationData.LastValueDataLong, Long>() {
@Override
public Long apply(AggregationData.LastValueDataLong arg) {
return arg.getLastValue();
}
},
new io.opencensus.common.Function<AggregationData, Long>() {
@Override
public Long apply(AggregationData arg) {
throw new UnsupportedOperationException();
}
});
return MoreObjects.firstNonNull(
aggregationData.match(
new io.opencensus.common.Function<AggregationData.SumDataDouble, Long>() {
@Override
public Long apply(AggregationData.SumDataDouble arg) {
return (long) Preconditions.checkNotNull(arg).getSum();
}
},
new io.opencensus.common.Function<AggregationData.SumDataLong, Long>() {
@Override
public Long apply(AggregationData.SumDataLong arg) {
return Preconditions.checkNotNull(arg).getSum();
}
},
new io.opencensus.common.Function<AggregationData.CountData, Long>() {
@Override
public Long apply(AggregationData.CountData arg) {
return Preconditions.checkNotNull(arg).getCount();
}
},
new io.opencensus.common.Function<AggregationData.DistributionData, Long>() {
@Override
public Long apply(AggregationData.DistributionData arg) {
return (long) Preconditions.checkNotNull(arg).getMean();
}
},
new io.opencensus.common.Function<AggregationData.LastValueDataDouble, Long>() {
@Override
public Long apply(AggregationData.LastValueDataDouble arg) {
return (long) Preconditions.checkNotNull(arg).getLastValue();
}
},
new io.opencensus.common.Function<AggregationData.LastValueDataLong, Long>() {
@Override
public Long apply(AggregationData.LastValueDataLong arg) {
return Preconditions.checkNotNull(arg).getLastValue();
}
},
new io.opencensus.common.Function<AggregationData, Long>() {
@Override
public Long apply(AggregationData arg) {
throw new UnsupportedOperationException();
}
}),
-1L);
}

private long getMetric(
View view,
String projectId,
String instanceId,
String databaseId,
String method,
boolean withOverride)
throws InterruptedException {
private long getMetric(View view, String method, boolean withOverride) {
List<TagValue> tagValues = new java.util.ArrayList<>();
for (TagKey column : view.getColumns()) {
if (column == SpannerRpcViews.INSTANCE_ID) {
tagValues.add(TagValue.create(instanceId));
tagValues.add(TagValue.create(INSTANCE_ID));
} else if (column == SpannerRpcViews.DATABASE_ID) {
tagValues.add(TagValue.create(databaseId));
tagValues.add(TagValue.create(DATABASE_ID));
} else if (column == SpannerRpcViews.METHOD) {
tagValues.add(TagValue.create(method));
} else if (column == SpannerRpcViews.PROJECT_ID) {
tagValues.add(TagValue.create(projectId));
tagValues.add(TagValue.create(PROJECT_ID));
}
}
for (int i = 0; i < MAXIMUM_RETRIES; i++) {
Thread.sleep(WAIT_FOR_METRICS_TIME_MS);
Thread.yield();
ViewData viewData = SpannerRpcViews.viewManager.getView(view.getName());
assertNotNull(viewData);
if (viewData.getAggregationMap() != null) {
Map<List<TagValue>, AggregationData> aggregationMap = viewData.getAggregationMap();
AggregationData aggregationData = aggregationMap.get(tagValues);
if (withOverride && getAggregationValueAsLong(aggregationData) == 0) {
if (aggregationData == null
|| withOverride && getAggregationValueAsLong(aggregationData) == 0) {
continue;
}
return getAggregationValueAsLong(aggregationData);
Expand Down
Loading