Skip to content

Commit 5979f2d

Browse files
committed
chore: cleanup
1 parent 55daa50 commit 5979f2d

File tree

8 files changed

+52
-27
lines changed

8 files changed

+52
-27
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@
2222
import com.google.cloud.spanner.Options.UpdateOption;
2323
import com.google.spanner.v1.BatchWriteResponse;
2424

25+
/**
26+
* Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link
27+
* UnsupportedOperationException} for all methods that are currently not supported for multiplexed
28+
* sessions. The concrete implementation implements the methods that are supported with multiplexed
29+
* sessions.
30+
*/
2531
abstract class AbstractMultiplexedSessionDatabaseClient implements DatabaseClient {
2632

2733
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -157,20 +157,13 @@ static class SingleReadContext extends AbstractReadContext {
157157
static class Builder extends AbstractReadContext.Builder<Builder, SingleReadContext> {
158158
private TimestampBound bound;
159159

160-
private boolean useRandomChannelHint;
161-
162160
private Builder() {}
163161

164162
Builder setTimestampBound(TimestampBound bound) {
165163
this.bound = bound;
166164
return self();
167165
}
168166

169-
Builder setUseRandomChannelHint(boolean useRandomChannelHint) {
170-
this.useRandomChannelHint = useRandomChannelHint;
171-
return self();
172-
}
173-
174167
@Override
175168
SingleReadContext build() {
176169
return new SingleReadContext(this);
@@ -195,14 +188,13 @@ static Builder newBuilder() {
195188
private SingleReadContext(Builder builder) {
196189
super(builder);
197190
this.bound = builder.bound;
198-
// Use the channel hint that is stored with the regular session if that is being used.
199-
// Otherwise, either use a random channel hint or no hint at all.
191+
// single use transaction have a single RPC and hence there is no need
192+
// of a channel hint. GAX will automatically choose a hint when used
193+
// with a multiplexed session to perform a round-robin channel selection. We are
194+
// passing a hint here to prefer random channel selection instead of doing GAX round-robin.
200195
this.channelHint =
201196
getChannelHintOptions(
202-
session.getOptions(),
203-
builder.useRandomChannelHint
204-
? ThreadLocalRandom.current().nextLong(Long.MAX_VALUE)
205-
: null);
197+
session.getOptions(), ThreadLocalRandom.current().nextLong(Long.MAX_VALUE));
206198
}
207199

208200
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ class DatabaseClientImpl implements DatabaseClient {
3939

4040
@VisibleForTesting
4141
DatabaseClientImpl(SessionPool pool, TraceWrapper tracer) {
42-
this("", pool, null, tracer);
42+
this("", pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
4343
}
4444

4545
@VisibleForTesting
4646
DatabaseClientImpl(String clientId, SessionPool pool, TraceWrapper tracer) {
47-
this(clientId, pool, null, tracer);
47+
this(clientId, pool, /* multiplexedSessionDatabaseClient = */ null, tracer);
4848
}
4949

5050
DatabaseClientImpl(
@@ -286,7 +286,9 @@ private <T> T runWithSessionRetry(Function<Session, T> callable) {
286286

287287
boolean isValid() {
288288
return pool.isValid()
289-
&& (multiplexedSessionDatabaseClient == null || multiplexedSessionDatabaseClient.isValid());
289+
&& (multiplexedSessionDatabaseClient == null
290+
|| multiplexedSessionDatabaseClient.isValid()
291+
|| !multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported());
290292
}
291293

292294
ListenableFuture<Void> closeAsync(ClosedException closedException) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ public boolean next() throws SpannerException {
9090
boolean hasNext = currRow.consumeRow(iterator);
9191
if (!hasNext) {
9292
statistics = iterator.getStats();
93+
// Close the ResultSet when there is no more data.
9394
close();
9495
}
9596
return hasNext;
@@ -114,11 +115,14 @@ public ResultSetMetadata getMetadata() {
114115

115116
@Override
116117
public void close() {
117-
if (!closed) {
118-
listener.onDone(iterator.isWithBeginTransaction());
119-
iterator.close("ResultSet closed");
118+
synchronized (this) {
119+
if (closed) {
120+
return;
121+
}
120122
closed = true;
121123
}
124+
listener.onDone(iterator.isWithBeginTransaction());
125+
iterator.close("ResultSet closed");
122126
}
123127

124128
@Override

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,31 +109,53 @@ public void close() {
109109
}
110110
}
111111

112+
/**
113+
* Keeps track of which channels have been 'given' to single-use transactions for a given Spanner
114+
* instance.
115+
*/
112116
private static final Map<SpannerImpl, BitSet> CHANNEL_USAGE = new HashMap<>();
113117

114118
private final BitSet channelUsage;
115119

116120
private final int numChannels;
117121

122+
/**
123+
* The number of single-use read-only transactions currently running on this multiplexed session.
124+
*/
118125
private final AtomicInteger numCurrentSingleUseTransactions = new AtomicInteger();
119126

120127
private boolean isClosed;
121128

129+
/** The duration before we try to replace the multiplexed session. The default is 7 days. */
122130
private final Duration sessionExpirationDuration;
123131

124132
private final SessionClient sessionClient;
125133

126134
private final TraceWrapper tracer;
127135

136+
/** The current multiplexed session that is used by this client. */
128137
private final AtomicReference<ApiFuture<SessionReference>> multiplexedSessionReference;
129138

139+
/** The expiration date/time of the current multiplexed session. */
130140
private final AtomicReference<Instant> expirationDate;
131141

142+
/**
143+
* The maintainer runs every 10 minutes to check whether the multiplexed session should be
144+
* refreshed.
145+
*/
132146
private final MultiplexedSessionMaintainer maintainer;
133147

148+
/**
149+
* If a {@link DatabaseNotFoundException} or {@link InstanceNotFoundException} is returned by the
150+
* server, then we set this field to mark the client as invalid.
151+
*/
134152
private final AtomicReference<ResourceNotFoundException> resourceNotFoundException =
135153
new AtomicReference<>();
136154

155+
/**
156+
* This flag is set to true if the server return UNIMPLEMENTED when we try to create a multiplexed
157+
* session.
158+
*/
137159
private final AtomicBoolean unimplemented = new AtomicBoolean(false);
138160

139161
MultiplexedSessionDatabaseClient(SessionClient sessionClient) {
@@ -293,7 +315,11 @@ private int getSingleUseChannelHint() {
293315
return NO_CHANNEL_HINT;
294316
}
295317
synchronized (this.channelUsage) {
296-
int channel = this.channelUsage.nextClearBit(0);
318+
// Get the first unused channel.
319+
int channel = this.channelUsage.nextClearBit(/* fromIndex = */ 0);
320+
// BitSet returns an index larger than its original size if all the bits are set.
321+
// This then means that all channels have already been assigned to single-use transactions,
322+
// and that we should not use a specific channel, but rather pick a random one.
297323
if (channel == this.numChannels) {
298324
return NO_CHANNEL_HINT;
299325
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,6 @@ public ReadContext singleUse(TimestampBound bound) {
307307
SingleReadContext.newBuilder()
308308
.setSession(this)
309309
.setTimestampBound(bound)
310-
.setUseRandomChannelHint(
311-
spanner.getOptions().getSessionPoolOptions().isUseRandomChannelHint())
312310
.setRpc(spanner.getRpc())
313311
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(getDatabaseId()))
314312
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())
@@ -332,8 +330,6 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) {
332330
SingleUseReadOnlyTransaction.newBuilder()
333331
.setSession(this)
334332
.setTimestampBound(bound)
335-
.setUseRandomChannelHint(
336-
spanner.getOptions().getSessionPoolOptions().isUseRandomChannelHint())
337333
.setRpc(spanner.getRpc())
338334
.setDefaultQueryOptions(spanner.getDefaultQueryOptions(getDatabaseId()))
339335
.setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks())

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2947,7 +2947,7 @@ private SessionPool(
29472947
this.waitOnMultiplexedSessionsLatch = new CountDownLatch(1);
29482948
}
29492949

2950-
// TODO: Remove once all code for multiplexed sessions have been removed from the pool.
2950+
// TODO: Remove once all code for multiplexed sessions has been removed from the pool.
29512951
private boolean useMultiplexedSessions() {
29522952
// Multiplexed sessions have moved to MultiplexedSessionDatabaseClient
29532953
return false;

google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3045,8 +3045,7 @@ public void testDatabaseOrInstanceDoesNotExistOnCreate() {
30453045
DatabaseClientImpl dbClient =
30463046
(DatabaseClientImpl) spanner.getDatabaseClient(databaseId);
30473047
// The CreateSession / BatchCreateSessions failure should propagate to the client and
3048-
// not
3049-
// retry.
3048+
// not retry.
30503049
try (ResultSet rs = dbClient.singleUse().executeQuery(SELECT1)) {
30513050
mockSpanner.unfreeze();
30523051
assertThrows(ResourceNotFoundException.class, rs::next);

0 commit comments

Comments
 (0)