Skip to content

feat: add session pool option for modelling a timeout around session acquisition. #2641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-spanner'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-spanner:6.47.0'
implementation 'com.google.cloud:google-cloud-spanner:6.48.0'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.47.0"
libraryDependencies += "com.google.cloud" % "google-cloud-spanner" % "6.48.0"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -431,7 +431,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-spanner/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-spanner.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.47.0
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-spanner/6.48.0
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ForwardingListenableFuture;
import com.google.common.util.concurrent.ForwardingListenableFuture.SimpleForwardingListenableFuture;
import com.google.common.util.concurrent.ListenableFuture;
Expand Down Expand Up @@ -120,17 +119,6 @@ class SessionPool {
private static final Logger logger = Logger.getLogger(SessionPool.class.getName());
private static final Tracer tracer = Tracing.getTracer();
static final String WAIT_FOR_SESSION = "SessionPool.WaitForSession";
static final ImmutableSet<ErrorCode> SHOULD_STOP_PREPARE_SESSIONS_ERROR_CODES =
ImmutableSet.of(
ErrorCode.UNKNOWN,
ErrorCode.INVALID_ARGUMENT,
ErrorCode.PERMISSION_DENIED,
ErrorCode.UNAUTHENTICATED,
ErrorCode.RESOURCE_EXHAUSTED,
ErrorCode.FAILED_PRECONDITION,
ErrorCode.OUT_OF_RANGE,
ErrorCode.UNIMPLEMENTED,
ErrorCode.INTERNAL);

/**
* If the {@link SessionPoolOptions#getWaitForMinSessions()} duration is greater than zero, waits
Expand Down Expand Up @@ -1675,7 +1663,8 @@ public PooledSession get() {
while (true) {
Span span = tracer.spanBuilder(WAIT_FOR_SESSION).startSpan();
try (Scope waitScope = tracer.withSpan(span)) {
PooledSession s = pollUninterruptiblyWithTimeout(currentTimeout);
PooledSession s =
pollUninterruptiblyWithTimeout(currentTimeout, options.getAcquireSessionTimeout());
if (s == null) {
// Set the status to DEADLINE_EXCEEDED and retry.
numWaiterTimeouts.incrementAndGet();
Expand All @@ -1685,6 +1674,11 @@ public PooledSession get() {
return s;
}
} catch (Exception e) {
if (e instanceof SpannerException
&& ErrorCode.RESOURCE_EXHAUSTED.equals(((SpannerException) e).getErrorCode())) {
numWaiterTimeouts.incrementAndGet();
tracer.getCurrentSpan().setStatus(Status.RESOURCE_EXHAUSTED);
}
TraceUtil.setWithFailure(span, e);
throw e;
} finally {
Expand All @@ -1693,15 +1687,26 @@ public PooledSession get() {
}
}

private PooledSession pollUninterruptiblyWithTimeout(long timeoutMillis) {
private PooledSession pollUninterruptiblyWithTimeout(
long timeoutMillis, Duration acquireSessionTimeout) {
boolean interrupted = false;
try {
while (true) {
try {
return waiter.get(timeoutMillis, TimeUnit.MILLISECONDS);
return acquireSessionTimeout == null
? waiter.get(timeoutMillis, TimeUnit.MILLISECONDS)
: waiter.get(acquireSessionTimeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
interrupted = true;
} catch (TimeoutException e) {
if (acquireSessionTimeout != null) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.RESOURCE_EXHAUSTED,
"Timed out after waiting "
+ acquireSessionTimeout.toMillis()
+ "ms for acquiring session. To mitigate error SessionPoolOptions#setAcquireSessionTimeout(Duration) to set a higher timeout"
+ " or increase the number of sessions in the session pool.");
}
return null;
} catch (ExecutionException e) {
throw SpannerExceptionFactory.newSpannerException(e.getCause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,16 @@ public class SessionPoolOptions {
private final ActionOnSessionLeak actionOnSessionLeak;
private final boolean trackStackTraceOfSessionCheckout;
private final InactiveTransactionRemovalOptions inactiveTransactionRemovalOptions;
private final long initialWaitForSessionTimeoutMillis;

/**
* Use {@link #acquireSessionTimeout} instead to specify the total duration to wait while
* acquiring session for a transaction.
*/
@Deprecated private final long initialWaitForSessionTimeoutMillis;

private final boolean autoDetectDialect;
private final Duration waitForMinSessions;
private final Duration acquireSessionTimeout;

/** Property for allowing mocking of session maintenance clock. */
private final Clock poolMaintainerClock;
Expand All @@ -78,6 +85,7 @@ private SessionPoolOptions(Builder builder) {
this.removeInactiveSessionAfter = builder.removeInactiveSessionAfter;
this.autoDetectDialect = builder.autoDetectDialect;
this.waitForMinSessions = builder.waitForMinSessions;
this.acquireSessionTimeout = builder.acquireSessionTimeout;
this.inactiveTransactionRemovalOptions = builder.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = builder.poolMaintainerClock;
}
Expand Down Expand Up @@ -105,6 +113,7 @@ public boolean equals(Object o) {
&& Objects.equals(this.removeInactiveSessionAfter, other.removeInactiveSessionAfter)
&& Objects.equals(this.autoDetectDialect, other.autoDetectDialect)
&& Objects.equals(this.waitForMinSessions, other.waitForMinSessions)
&& Objects.equals(this.acquireSessionTimeout, other.acquireSessionTimeout)
&& Objects.equals(
this.inactiveTransactionRemovalOptions, other.inactiveTransactionRemovalOptions)
&& Objects.equals(this.poolMaintainerClock, other.poolMaintainerClock);
Expand All @@ -128,6 +137,7 @@ public int hashCode() {
this.removeInactiveSessionAfter,
this.autoDetectDialect,
this.waitForMinSessions,
this.acquireSessionTimeout,
this.inactiveTransactionRemovalOptions,
this.poolMaintainerClock);
}
Expand Down Expand Up @@ -239,6 +249,11 @@ Duration getWaitForMinSessions() {
return waitForMinSessions;
}

@VisibleForTesting
Duration getAcquireSessionTimeout() {
return acquireSessionTimeout;
}

public static Builder newBuilder() {
return new Builder();
}
Expand Down Expand Up @@ -424,6 +439,7 @@ public static class Builder {
private Duration removeInactiveSessionAfter = Duration.ofMinutes(55L);
private boolean autoDetectDialect = false;
private Duration waitForMinSessions = Duration.ZERO;
private Duration acquireSessionTimeout = Duration.ofSeconds(60);

private Clock poolMaintainerClock;

Expand All @@ -446,6 +462,7 @@ private Builder(SessionPoolOptions options) {
this.removeInactiveSessionAfter = options.removeInactiveSessionAfter;
this.autoDetectDialect = options.autoDetectDialect;
this.waitForMinSessions = options.waitForMinSessions;
this.acquireSessionTimeout = options.acquireSessionTimeout;
this.inactiveTransactionRemovalOptions = options.inactiveTransactionRemovalOptions;
this.poolMaintainerClock = options.poolMaintainerClock;
}
Expand Down Expand Up @@ -538,6 +555,10 @@ public Builder setFailIfPoolExhausted() {
/**
* If all sessions are in use and there is no more room for creating new sessions, block for a
* session to become available. Default behavior is same.
*
* <p>By default the requests are blocked for 60s and will fail with a `SpannerException` with
* error code `ResourceExhausted` if this timeout is exceeded. If you wish to block for a
* different period use the option {@link Builder#setAcquireSessionTimeout(Duration)} ()}
*/
public Builder setBlockIfPoolExhausted() {
this.actionOnExhaustion = ActionOnExhaustion.BLOCK;
Expand Down Expand Up @@ -695,6 +716,25 @@ public Builder setWaitForMinSessions(Duration waitForMinSessions) {
return this;
}

/**
* If greater than zero, we wait for said duration when no sessions are available in the {@link
* SessionPool}. The default is a 60s timeout. Set the value to null to disable the timeout.
*/
public Builder setAcquireSessionTimeout(Duration acquireSessionTimeout) {
try {
if (acquireSessionTimeout != null) {
Preconditions.checkArgument(
acquireSessionTimeout.toMillis() > 0,
"acquireSessionTimeout should be greater than 0 ns");
}
} catch (ArithmeticException ex) {
throw new IllegalArgumentException(
"acquireSessionTimeout in millis should be lesser than Long.MAX_VALUE");
}
this.acquireSessionTimeout = acquireSessionTimeout;
return this;
}

/** Build a SessionPoolOption object */
public SessionPoolOptions build() {
validate();
Expand Down
Loading