Skip to content

chore(spanner): handle server side kill switch of R/W multiplexed sessions #3441

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ DatabaseClient getMultiplexedSession() {

@VisibleForTesting
DatabaseClient getMultiplexedSessionForRW() {
if (this.useMultiplexedSessionForRW) {
if (canUseMultiplexedSessionsForRW()) {
return getMultiplexedSession();
}
return getSession();
Expand All @@ -107,6 +107,12 @@ private boolean canUseMultiplexedSessions() {
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsSupported();
}

private boolean canUseMultiplexedSessionsForRW() {
return this.useMultiplexedSessionForRW
&& this.multiplexedSessionDatabaseClient != null
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForRWSupported();
}

@Override
public Dialect getDialect() {
return pool.getDialect();
Expand All @@ -129,7 +135,7 @@ public CommitResponse writeWithOptions(
throws SpannerException {
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, options);
try (IScope s = tracer.withSpan(span)) {
if (this.useMultiplexedSessionForRW && getMultiplexedSessionDatabaseClient() != null) {
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
return getMultiplexedSessionDatabaseClient().writeWithOptions(mutations, options);
}
return runWithSessionRetry(session -> session.writeWithOptions(mutations, options));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import static com.google.cloud.spanner.SessionImpl.NO_CHANNEL_HINT;
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
Expand All @@ -27,6 +28,10 @@
import com.google.cloud.spanner.SpannerException.ResourceNotFoundException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.RequestOptions;
import com.google.spanner.v1.Transaction;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
Expand Down Expand Up @@ -92,6 +97,10 @@ void onError(SpannerException spannerException) {
// synchronizing, as it does not really matter exactly which error is set.
this.client.resourceNotFoundException.set((ResourceNotFoundException) spannerException);
}
// Mark multiplexed sessions for RW as unimplemented and fall back to regular sessions if
// UNIMPLEMENTED with error message "Transaction type read_write not supported with
// multiplexed sessions" is returned.
this.client.maybeMarkUnimplementedForRW(spannerException);
}

@Override
Expand Down Expand Up @@ -164,6 +173,12 @@ public void close() {
/** The current multiplexed session that is used by this client. */
private final AtomicReference<ApiFuture<SessionReference>> multiplexedSessionReference;

/**
* The Transaction response returned by the BeginTransaction request with read-write when a
* multiplexed session is created during client initialization.
*/
private final SettableApiFuture<Transaction> readWriteBeginTransactionReferenceFuture;

/** The expiration date/time of the current multiplexed session. */
private final AtomicReference<Instant> expirationDate;

Expand All @@ -190,6 +205,12 @@ public void close() {
*/
private final AtomicBoolean unimplemented = new AtomicBoolean(false);

/**
* This flag is set to true if the server return UNIMPLEMENTED when a read-write transaction is
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
*/
@VisibleForTesting final AtomicBoolean unimplementedForRW = new AtomicBoolean(false);

MultiplexedSessionDatabaseClient(SessionClient sessionClient) {
this(sessionClient, Clock.systemUTC());
}
Expand Down Expand Up @@ -217,6 +238,7 @@ public void close() {
this.tracer = sessionClient.getSpanner().getTracer();
final SettableApiFuture<SessionReference> initialSessionReferenceFuture =
SettableApiFuture.create();
this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create();
this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture);
this.sessionClient.asyncCreateMultiplexedSession(
new SessionConsumer() {
Expand All @@ -226,6 +248,16 @@ public void onSessionReady(SessionImpl session) {
// only start the maintainer if we actually managed to create a session in the first
// place.
maintainer.start();

// initiate a begin transaction request to verify if read-write transactions are
// supported using multiplexed sessions.
if (sessionClient
.getSpanner()
.getOptions()
.getSessionPoolOptions()
.getUseMultiplexedSessionForRW()) {
verifyBeginTransactionWithRWOnMultiplexedSessionAsync(session.getName());
}
}

@Override
Expand Down Expand Up @@ -267,6 +299,70 @@ private void maybeMarkUnimplemented(Throwable t) {
}
}

private void maybeMarkUnimplementedForRW(SpannerException spannerException) {
if (spannerException.getErrorCode() == ErrorCode.UNIMPLEMENTED
&& verifyErrorMessage(
spannerException,
"Transaction type read_write not supported with multiplexed sessions")) {
unimplementedForRW.set(true);
}
}

private boolean verifyErrorMessage(SpannerException spannerException, String message) {
if (spannerException.getCause() == null) {
return false;
}
if (spannerException.getCause().getMessage() == null) {
return false;
}
return spannerException.getCause().getMessage().contains(message);
}

private void verifyBeginTransactionWithRWOnMultiplexedSessionAsync(String sessionName) {
// TODO: Remove once this is guaranteed to be available.
// annotate the explict BeginTransactionRequest with a transaction tag
// "multiplexed-rw-background-begin-txn" to avoid storing this request on mock spanner.
// this is to safeguard other mock spanner tests whose BeginTransaction request count will
// otherwise increase by 1. Modifying the unit tests do not seem valid since this code is
// temporary and will be removed once the read-write on multiplexed session looks stable at
// backend.
BeginTransactionRequest.Builder requestBuilder =
BeginTransactionRequest.newBuilder()
.setSession(sessionName)
.setOptions(
SessionImpl.createReadWriteTransactionOptions(
Options.fromTransactionOptions(), /* previousTransactionId = */ null))
.setRequestOptions(
RequestOptions.newBuilder()
.setTransactionTag("multiplexed-rw-background-begin-txn")
.build());
final BeginTransactionRequest request = requestBuilder.build();
final ApiFuture<Transaction> requestFuture;
requestFuture =
sessionClient
.getSpanner()
.getRpc()
.beginTransactionAsync(request, /* options = */ null, /* routeToLeader = */ true);
requestFuture.addListener(
() -> {
try {
Transaction txn = requestFuture.get();
if (txn.getId().isEmpty()) {
throw newSpannerException(
ErrorCode.INTERNAL, "Missing id in transaction\n" + sessionName);
}
readWriteBeginTransactionReferenceFuture.set(txn);
} catch (Exception e) {
SpannerException spannerException = SpannerExceptionFactory.newSpannerException(e);
// Mark multiplexed sessions for RW as unimplemented and fall back to regular sessions
// if UNIMPLEMENTED is returned.
maybeMarkUnimplementedForRW(spannerException);
readWriteBeginTransactionReferenceFuture.setException(e);
}
},
MoreExecutors.directExecutor());
}

boolean isValid() {
return resourceNotFoundException.get() == null;
}
Expand All @@ -283,6 +379,10 @@ boolean isMultiplexedSessionsSupported() {
return !this.unimplemented.get();
}

boolean isMultiplexedSessionsForRWSupported() {
return !this.unimplementedForRW.get();
}

void close() {
synchronized (this) {
if (!this.isClosed) {
Expand All @@ -308,6 +408,17 @@ SessionReference getCurrentSessionReference() {
}
}

@VisibleForTesting
Transaction getReadWriteBeginTransactionReference() {
try {
return this.readWriteBeginTransactionReferenceFuture.get();
} catch (ExecutionException executionException) {
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
} catch (InterruptedException interruptedException) {
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
}
}

/**
* Returns true if the multiplexed session has been created. This client can be used before the
* session has been created, and will in that case use a delayed transaction that contains a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1872,7 +1872,17 @@ private Transaction getTemporaryTransactionOrNull(TransactionSelector tx) {
@Override
public void beginTransaction(
BeginTransactionRequest request, StreamObserver<Transaction> responseObserver) {
requests.add(request);
// TODO: Remove once this is guaranteed to be available.
// Skip storing the explicit BeginTransactionRequest used to verify read-write transaction
// server availability on multiplexed sessions.
// This code will be removed once read-write multiplexed sessions are stable on the backend,
// hence the temporary trade-off.
if (!request
.getRequestOptions()
.getTransactionTag()
.equals("multiplexed-rw-background-begin-txn")) {
requests.add(request);
}
Preconditions.checkNotNull(request.getSession());
Session session = getSession(request.getSession());
if (session == null) {
Expand Down
Loading
Loading