Skip to content

Commit e75a281

Browse files
feat: move session lastUseTime parameter from PooledSession to SessionImpl class. Fix updation of the parameter for chained RPCs within one transaction. (#2704)
* fix: prevent illegal negative timeout values into thread sleep() method while retrying exceptions in unit tests. * For details on issue see - #2206 * Fixing lint issues. * refactor: move session lastUseTime parameter from PooledSession to SessionImpl class. Fix updation of the parameter for chained RPCs within one transaction. * chore: add clock instances in callees of SessionImpl. * chore: partially fix failing unit tests in SessionPoolTest and SessionPoolMaintainerTest. * chore: fix failing tests in SessionPoolStressTest. * chore: update lastUseTime for methods in SessionPoolTransactionContext. Add a couple of unit tests for testing the new behaviour. * chore: lint errors. * chore: fix tests in DatabaseClientImplTest by passing the mocked clock instance. * fix: update session lastUseTime field for AbstractReadContext class. Fix the unit test to test this change. * fix: failing tests in TransactionRunnerImplTest. * fix: failing test in SessionPoolMaintainerTest. * refactor: move FakeClock to a new class. * refactor: move Clock to a new class. * chore: resolving PR comments. * chore: address review comments. * chore: updating lastUseTime state in TransactionRunnerImpl. Removing redundant updates from SessionPool class. * chore: remove redundant update statements from SessionPool class. Add more unit tests. * chore: add more tests for TransactionRunner. * chore: remove dead code from constructor of SessionPoolTransactionContext. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java Co-authored-by: Knut Olav Løite <[email protected]> * Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java Co-authored-by: Knut Olav Løite <[email protected]> * Update google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java Co-authored-by: Knut Olav Løite <[email protected]> * chore: fixing precondition errors due to null clock. --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent e320753 commit e75a281

13 files changed

+1009
-112
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.google.cloud.spanner.SessionImpl.SessionTransaction;
3939
import com.google.cloud.spanner.spi.v1.SpannerRpc;
4040
import com.google.common.annotations.VisibleForTesting;
41+
import com.google.common.base.Preconditions;
4142
import com.google.common.util.concurrent.MoreExecutors;
4243
import com.google.protobuf.ByteString;
4344
import com.google.spanner.v1.BeginTransactionRequest;
@@ -72,6 +73,7 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractReadCon
7273
private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS;
7374
private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS;
7475
private ExecutorProvider executorProvider;
76+
private Clock clock = new Clock();
7577

7678
Builder() {}
7779

@@ -110,6 +112,11 @@ B setExecutorProvider(ExecutorProvider executorProvider) {
110112
return self();
111113
}
112114

115+
B setClock(Clock clock) {
116+
this.clock = Preconditions.checkNotNull(clock);
117+
return self();
118+
}
119+
113120
abstract T build();
114121
}
115122

@@ -392,6 +399,8 @@ void initTransaction() {
392399
private final int defaultPrefetchChunks;
393400
private final QueryOptions defaultQueryOptions;
394401

402+
private final Clock clock;
403+
395404
@GuardedBy("lock")
396405
private boolean isValid = true;
397406

@@ -416,6 +425,7 @@ void initTransaction() {
416425
this.defaultQueryOptions = builder.defaultQueryOptions;
417426
this.span = builder.span;
418427
this.executorProvider = builder.executorProvider;
428+
this.clock = builder.clock;
419429
}
420430

421431
@Override
@@ -689,6 +699,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
689699
SpannerRpc.StreamingCall call =
690700
rpc.executeQuery(
691701
request.build(), stream.consumer(), session.getOptions(), isRouteToLeader());
702+
session.markUsed(clock.instant());
692703
call.request(prefetchChunks);
693704
stream.setCall(call, request.getTransaction().hasBegin());
694705
return stream;
@@ -826,6 +837,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
826837
SpannerRpc.StreamingCall call =
827838
rpc.read(
828839
builder.build(), stream.consumer(), session.getOptions(), isRouteToLeader());
840+
session.markUsed(clock.instant());
829841
call.request(prefetchChunks);
830842
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
831843
return stream;
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
import org.threeten.bp.Instant;
20+
21+
/**
22+
* Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8
23+
* Clock.
24+
*/
25+
class Clock {
26+
Instant instant() {
27+
return Instant.now();
28+
}
29+
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.Map;
5454
import java.util.concurrent.ExecutionException;
5555
import javax.annotation.Nullable;
56+
import org.threeten.bp.Instant;
5657

5758
/**
5859
* Implementation of {@link Session}. Sessions are managed internally by the client library, and
@@ -98,12 +99,14 @@ interface SessionTransaction {
9899
ByteString readyTransactionId;
99100
private final Map<SpannerRpc.Option, ?> options;
100101
private Span currentSpan;
102+
private volatile Instant lastUseTime;
101103

102104
SessionImpl(SpannerImpl spanner, String name, Map<SpannerRpc.Option, ?> options) {
103105
this.spanner = spanner;
104106
this.options = options;
105107
this.name = checkNotNull(name);
106108
this.databaseId = SessionId.of(name).getDatabaseId();
109+
this.lastUseTime = Instant.now();
107110
}
108111

109112
@Override
@@ -123,6 +126,14 @@ Span getCurrentSpan() {
123126
return currentSpan;
124127
}
125128

129+
Instant getLastUseTime() {
130+
return lastUseTime;
131+
}
132+
133+
void markUsed(Instant instant) {
134+
lastUseTime = instant;
135+
}
136+
126137
@Override
127138
public long executePartitionedUpdate(Statement stmt, UpdateOption... options) {
128139
setActive(null);
@@ -385,6 +396,9 @@ ApiFuture<ByteString> beginTransactionAsync(Options transactionOptions, boolean
385396
}
386397

387398
TransactionContextImpl newTransaction(Options options) {
399+
// A clock instance is passed in {@code SessionPoolOptions} in order to allow mocking via tests.
400+
final Clock poolMaintainerClock =
401+
spanner.getOptions().getSessionPoolOptions().getPoolMaintainerClock();
388402
return TransactionContextImpl.newBuilder()
389403
.setSession(this)
390404
.setOptions(options)
@@ -396,6 +410,7 @@ TransactionContextImpl newTransaction(Options options) {
396410
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
397411
.setSpan(currentSpan)
398412
.setExecutorProvider(spanner.getAsyncExecutorProvider())
413+
.setClock(poolMaintainerClock == null ? new Clock() : poolMaintainerClock)
399414
.build();
400415
}
401416

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -144,16 +144,6 @@ void maybeWaitOnMinSessions() {
144144
}
145145
}
146146

147-
/**
148-
* Wrapper around current time so that we can fake it in tests. TODO(user): Replace with Java 8
149-
* Clock.
150-
*/
151-
static class Clock {
152-
Instant instant() {
153-
return Instant.now();
154-
}
155-
}
156-
157147
private abstract static class CachedResultSetSupplier implements Supplier<ResultSet> {
158148
private ResultSet cached;
159149

@@ -1370,7 +1360,6 @@ PooledSession get(final boolean eligibleForLongRunning) {
13701360

13711361
class PooledSession implements Session {
13721362
@VisibleForTesting SessionImpl delegate;
1373-
private volatile Instant lastUseTime;
13741363
private volatile SpannerException lastException;
13751364
private volatile boolean allowReplacing = true;
13761365

@@ -1409,7 +1398,9 @@ class PooledSession implements Session {
14091398
private PooledSession(SessionImpl delegate) {
14101399
this.delegate = delegate;
14111400
this.state = SessionState.AVAILABLE;
1412-
this.lastUseTime = clock.instant();
1401+
1402+
// initialise the lastUseTime field for each session.
1403+
this.markUsed();
14131404
}
14141405

14151406
int getChannel() {
@@ -1631,7 +1622,7 @@ private void markClosing() {
16311622
}
16321623

16331624
void markUsed() {
1634-
lastUseTime = clock.instant();
1625+
delegate.markUsed(clock.instant());
16351626
}
16361627

16371628
@Override
@@ -1827,7 +1818,7 @@ private void removeIdleSessions(Instant currTime) {
18271818
Iterator<PooledSession> iterator = sessions.descendingIterator();
18281819
while (iterator.hasNext()) {
18291820
PooledSession session = iterator.next();
1830-
if (session.lastUseTime.isBefore(minLastUseTime)) {
1821+
if (session.delegate.getLastUseTime().isBefore(minLastUseTime)) {
18311822
if (session.state != SessionState.CLOSING) {
18321823
boolean isRemoved = removeFromPool(session);
18331824
if (isRemoved) {
@@ -1929,7 +1920,8 @@ private void removeLongRunningSessions(
19291920
// collection is populated only when the get() method in {@code PooledSessionFuture} is
19301921
// called.
19311922
final PooledSession session = sessionFuture.get();
1932-
final Duration durationFromLastUse = Duration.between(session.lastUseTime, currentTime);
1923+
final Duration durationFromLastUse =
1924+
Duration.between(session.delegate.getLastUseTime(), currentTime);
19331925
if (!session.eligibleForLongRunning
19341926
&& durationFromLastUse.compareTo(
19351927
inactiveTransactionRemovalOptions.getIdleTimeThreshold())
@@ -2327,7 +2319,7 @@ private PooledSession findSessionToKeepAlive(
23272319
&& (numChecked + numAlreadyChecked)
23282320
< (options.getMinSessions() + options.getMaxIdleSessions() - numSessionsInUse)) {
23292321
PooledSession session = iterator.next();
2330-
if (session.lastUseTime.isBefore(keepAliveThreshold)) {
2322+
if (session.delegate.getLastUseTime().isBefore(keepAliveThreshold)) {
23312323
iterator.remove();
23322324
return session;
23332325
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPoolOptions.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
package com.google.cloud.spanner;
1818

19-
import com.google.cloud.spanner.SessionPool.Clock;
2019
import com.google.cloud.spanner.SessionPool.Position;
2120
import com.google.common.annotations.VisibleForTesting;
2221
import com.google.common.base.Preconditions;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,19 @@ class TransactionRunnerImpl implements SessionTransaction, TransactionRunner {
8585
@VisibleForTesting
8686
static class TransactionContextImpl extends AbstractReadContext implements TransactionContext {
8787
static class Builder extends AbstractReadContext.Builder<Builder, TransactionContextImpl> {
88+
89+
private Clock clock = new Clock();
8890
private ByteString transactionId;
8991
private Options options;
9092
private boolean trackTransactionStarter;
9193

9294
private Builder() {}
9395

96+
Builder setClock(Clock clock) {
97+
this.clock = Preconditions.checkNotNull(clock);
98+
return self();
99+
}
100+
94101
Builder setTransactionId(ByteString transactionId) {
95102
this.transactionId = transactionId;
96103
return self();
@@ -189,13 +196,15 @@ public void removeListener(Runnable listener) {
189196
volatile ByteString transactionId;
190197

191198
private CommitResponse commitResponse;
199+
private final Clock clock;
192200

193201
private TransactionContextImpl(Builder builder) {
194202
super(builder);
195203
this.transactionId = builder.transactionId;
196204
this.trackTransactionStarter = builder.trackTransactionStarter;
197205
this.options = builder.options;
198206
this.finishedAsyncOperations.set(null);
207+
this.clock = builder.clock;
199208
}
200209

201210
@Override
@@ -389,6 +398,7 @@ public void run() {
389398
tracer.spanBuilderWithExplicitParent(SpannerImpl.COMMIT, span).startSpan();
390399
final ApiFuture<com.google.spanner.v1.CommitResponse> commitFuture =
391400
rpc.commitAsync(commitRequest, session.getOptions());
401+
session.markUsed(clock.instant());
392402
commitFuture.addListener(
393403
tracer.withSpan(
394404
opSpan,
@@ -463,12 +473,15 @@ ApiFuture<Empty> rollbackAsync() {
463473
// is still in flight. That transaction will then automatically be terminated by the server.
464474
if (transactionId != null) {
465475
span.addAnnotation("Starting Rollback");
466-
return rpc.rollbackAsync(
467-
RollbackRequest.newBuilder()
468-
.setSession(session.getName())
469-
.setTransactionId(transactionId)
470-
.build(),
471-
session.getOptions());
476+
ApiFuture<Empty> apiFuture =
477+
rpc.rollbackAsync(
478+
RollbackRequest.newBuilder()
479+
.setSession(session.getName())
480+
.setTransactionId(transactionId)
481+
.build(),
482+
session.getOptions());
483+
session.markUsed(clock.instant());
484+
return apiFuture;
472485
} else {
473486
return ApiFutures.immediateFuture(Empty.getDefaultInstance());
474487
}
@@ -723,6 +736,7 @@ private ResultSet internalExecuteUpdate(
723736
try {
724737
com.google.spanner.v1.ResultSet resultSet =
725738
rpc.executeQuery(builder.build(), session.getOptions(), isRouteToLeader());
739+
session.markUsed(clock.instant());
726740
if (resultSet.getMetadata().hasTransaction()) {
727741
onTransactionMetadata(
728742
resultSet.getMetadata().getTransaction(), builder.getTransaction().hasBegin());
@@ -753,6 +767,7 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
753767
// commit.
754768
increaseAsyncOperations();
755769
resultSet = rpc.executeQueryAsync(builder.build(), session.getOptions(), isRouteToLeader());
770+
session.markUsed(clock.instant());
756771
} catch (Throwable t) {
757772
decreaseAsyncOperations();
758773
throw t;
@@ -824,6 +839,7 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
824839
try {
825840
com.google.spanner.v1.ExecuteBatchDmlResponse response =
826841
rpc.executeBatchDml(builder.build(), session.getOptions());
842+
session.markUsed(clock.instant());
827843
long[] results = new long[response.getResultSetsCount()];
828844
for (int i = 0; i < response.getResultSetsCount(); ++i) {
829845
results[i] = response.getResultSets(i).getStats().getRowCountExact();
@@ -863,6 +879,7 @@ public ApiFuture<long[]> batchUpdateAsync(
863879
// commit.
864880
increaseAsyncOperations();
865881
response = rpc.executeBatchDmlAsync(builder.build(), session.getOptions());
882+
session.markUsed(clock.instant());
866883
} catch (Throwable t) {
867884
decreaseAsyncOperations();
868885
throw t;

0 commit comments

Comments
 (0)