Skip to content

chore(spanner): Implement fallback for multiplexed session on Partitioned Operations #3631

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.spanner.Options.QueryOption;
import com.google.cloud.spanner.Options.ReadOption;
import com.google.cloud.spanner.spi.v1.SpannerRpc;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.Struct;
Expand All @@ -35,6 +36,7 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
Expand All @@ -59,6 +61,13 @@ public class BatchClientImpl implements BatchClient {
@GuardedBy("multiplexedSessionLock")
private final AtomicReference<SessionImpl> multiplexedSessionReference;

/**
* This flag is set to true if the server return UNIMPLEMENTED when partitioned transaction is
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
*/
@VisibleForTesting
static final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false);

BatchClientImpl(SessionClient sessionClient, boolean isMultiplexedSessionEnabled) {
this.sessionClient = checkNotNull(sessionClient);
this.isMultiplexedSessionEnabled = isMultiplexedSessionEnabled;
Expand All @@ -85,7 +94,7 @@ public String getDatabaseRole() {
@Override
public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
SessionImpl session;
if (isMultiplexedSessionEnabled) {
if (canUseMultiplexedSession()) {
session = getMultiplexedSession();
} else {
session = sessionClient.createSession();
Expand Down Expand Up @@ -131,6 +140,10 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
batchTransactionId);
}

private boolean canUseMultiplexedSession() {
return isMultiplexedSessionEnabled && !unimplementedForPartitionedOps.get();
}

private SessionImpl getMultiplexedSession() {
this.multiplexedSessionLock.lock();
try {
Expand Down Expand Up @@ -216,15 +229,26 @@ public List<Partition> partitionReadUsingIndex(
builder.setPartitionOptions(pbuilder.build());

final PartitionReadRequest request = builder.build();
PartitionResponse response = rpc.partitionRead(request, options);
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Partition.createReadPartition(
p.getPartitionToken(), partitionOptions, table, index, keys, columns, readOptions);
partitions.add(partition);
try {
PartitionResponse response = rpc.partitionRead(request, options);
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Partition.createReadPartition(
p.getPartitionToken(),
partitionOptions,
table,
index,
keys,
columns,
readOptions);
partitions.add(partition);
}
return partitions.build();
} catch (SpannerException e) {
maybeMarkUnimplementedForPartitionedOps(e);
throw e;
}
return partitions.build();
}

@Override
Expand Down Expand Up @@ -256,15 +280,27 @@ public List<Partition> partitionQuery(
builder.setPartitionOptions(pbuilder.build());

final PartitionQueryRequest request = builder.build();
PartitionResponse response = rpc.partitionQuery(request, options);
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Partition.createQueryPartition(
p.getPartitionToken(), partitionOptions, statement, queryOptions);
partitions.add(partition);
try {
PartitionResponse response = rpc.partitionQuery(request, options);
ImmutableList.Builder<Partition> partitions = ImmutableList.builder();
for (com.google.spanner.v1.Partition p : response.getPartitionsList()) {
Partition partition =
Partition.createQueryPartition(
p.getPartitionToken(), partitionOptions, statement, queryOptions);
partitions.add(partition);
}
return partitions.build();
} catch (SpannerException e) {
maybeMarkUnimplementedForPartitionedOps(e);
throw e;
}
}

void maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
if (MultiplexedSessionDatabaseClient.verifyErrorMessage(
spannerException, "Partitioned operations are not supported with multiplexed sessions")) {
unimplementedForPartitionedOps.set(true);
}
return partitions.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ private boolean canUseMultiplexedSessionsForRW() {
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForRWSupported();
}

private boolean canUseMultiplexedSessionsForPartitionedOps() {
return this.useMultiplexedSessionPartitionedOps
&& this.multiplexedSessionDatabaseClient != null
&& this.multiplexedSessionDatabaseClient.isMultiplexedSessionsForPartitionedOpsSupported();
}

@Override
public Dialect getDialect() {
return pool.getDialect();
Expand Down Expand Up @@ -323,8 +329,15 @@ public AsyncTransactionManager transactionManagerAsync(TransactionOption... opti

@Override
public long executePartitionedUpdate(final Statement stmt, final UpdateOption... options) {
if (useMultiplexedSessionPartitionedOps) {
return getMultiplexedSession().executePartitionedUpdate(stmt, options);

if (canUseMultiplexedSessionsForPartitionedOps()) {
try {
return getMultiplexedSession().executePartitionedUpdate(stmt, options);
} catch (SpannerException e) {
if (!multiplexedSessionDatabaseClient.maybeMarkUnimplementedForPartitionedOps(e)) {
throw e;
}
}
}
return executePartitionedUpdateWithPooledSession(stmt, options);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,11 @@ void onError(SpannerException spannerException) {
// UNIMPLEMENTED with error message "Transaction type read_write not supported with
// multiplexed sessions" is returned.
this.client.maybeMarkUnimplementedForRW(spannerException);
// Mark multiplexed sessions for Partitioned Ops as unimplemented and fall back to regular
// sessions if
// UNIMPLEMENTED with error message "Partitioned operations are not supported with multiplexed
// sessions".
this.client.maybeMarkUnimplementedForPartitionedOps(spannerException);
}

@Override
Expand Down Expand Up @@ -214,6 +219,12 @@ public void close() {
*/
@VisibleForTesting final AtomicBoolean unimplementedForRW = new AtomicBoolean(false);

/**
* This flag is set to true if the server return UNIMPLEMENTED when partitioned transaction is
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
*/
@VisibleForTesting final AtomicBoolean unimplementedForPartitionedOps = new AtomicBoolean(false);

MultiplexedSessionDatabaseClient(SessionClient sessionClient) {
this(sessionClient, Clock.systemUTC());
}
Expand Down Expand Up @@ -316,7 +327,18 @@ && verifyErrorMessage(
}
}

private boolean verifyErrorMessage(SpannerException spannerException, String message) {
boolean maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
if (spannerException.getErrorCode() == ErrorCode.UNIMPLEMENTED
&& verifyErrorMessage(
spannerException,
"Transaction type partitioned_dml not supported with multiplexed sessions")) {
unimplementedForPartitionedOps.set(true);
return true;
}
return false;
}

static boolean verifyErrorMessage(SpannerException spannerException, String message) {
if (spannerException.getCause() == null) {
return false;
}
Expand Down Expand Up @@ -391,6 +413,10 @@ boolean isMultiplexedSessionsForRWSupported() {
return !this.unimplementedForRW.get();
}

boolean isMultiplexedSessionsForPartitionedOpsSupported() {
return !this.unimplementedForPartitionedOps.get();
}

void close() {
synchronized (this) {
if (!this.isClosed) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,20 +46,12 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.spanner.v1.BatchWriteRequest;
import com.google.spanner.v1.BatchWriteResponse;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.*;
import com.google.spanner.v1.RequestOptions.Priority;
import com.google.spanner.v1.Session;
import com.google.spanner.v1.Transaction;
import io.grpc.Status;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -1540,6 +1532,89 @@ public void testInitialBeginTransactionWithRW_receivesUnimplemented_fallsBackToR
assertFalse(session2.getMultiplexed());
}

// Tests the behavior of the server-side kill switch for read-write multiplexed sessions.
@Test
public void
testInitialBeginTransactionWithPDML_receivesUnimplemented_fallsBackToRegularSession() {
mockSpanner.setBeginTransactionExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.UNIMPLEMENTED
.withDescription(
"Transaction type partitioned_dml not supported with multiplexed sessions")
.asRuntimeException(),
Status.UNIMPLEMENTED
.withDescription(
"Transaction type partitioned_dml not supported with multiplexed sessions")
.asRuntimeException())));
DatabaseClientImpl client =
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));

assertNotNull(client.multiplexedSessionDatabaseClient);

// Partitioned Ops transaction should fallback to regular sessions
assertEquals(UPDATE_COUNT, client.executePartitionedUpdate(UPDATE_STATEMENT));

// Verify that we received one ExecuteSqlRequest, and it uses a regular session due to fallback.
List<ExecuteSqlRequest> executeSqlRequests =
mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
assertEquals(1, executeSqlRequests.size());
// Verify the requests are not executed using multiplexed sessions
Session session2 = mockSpanner.getSession(executeSqlRequests.get(0).getSession());
assertNotNull(session2);
assertFalse(session2.getMultiplexed());
assertTrue(client.multiplexedSessionDatabaseClient.unimplementedForPartitionedOps.get());
}

// Tests the behavior of the server-side kill switch for read-write multiplexed sessions.
@Test
public void testPartitionedQuery_receivesUnimplemented_fallsBackToRegularSession() {
mockSpanner.setPartitionQueryExecutionTime(
SimulatedExecutionTime.ofException(
Status.INVALID_ARGUMENT
.withDescription(
"Partitioned operations are not supported with multiplexed sessions")
.asRuntimeException()));
BatchClientImpl client = (BatchClientImpl) spanner.getBatchClient(DatabaseId.of("p", "i", "d"));

try (BatchReadOnlyTransaction transaction =
client.batchReadOnlyTransaction(TimestampBound.strong())) {
// Partitioned Query should fail
SpannerException spannerException =
assertThrows(
SpannerException.class,
() -> {
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);
});
assertEquals(ErrorCode.INVALID_ARGUMENT, spannerException.getErrorCode());

// Verify that we received one PartitionQueryRequest.
List<PartitionQueryRequest> partitionQueryRequests =
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
assertEquals(1, partitionQueryRequests.size());
// Verify the requests were executed using multiplexed sessions
Session session2 = mockSpanner.getSession(partitionQueryRequests.get(0).getSession());
assertNotNull(session2);
assertTrue(session2.getMultiplexed());
assertTrue(client.unimplementedForPartitionedOps.get());
}
try (BatchReadOnlyTransaction transaction =
client.batchReadOnlyTransaction(TimestampBound.strong())) {
// Partitioned Query should fail
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);

// // Verify that we received two PartitionQueryRequest. and it uses a regular session due to
// fallback.
List<PartitionQueryRequest> partitionQueryRequests =
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
assertEquals(2, partitionQueryRequests.size());
// Verify the requests are not executed using multiplexed sessions
Session session2 = mockSpanner.getSession(partitionQueryRequests.get(1).getSession());
assertNotNull(session2);
assertFalse(session2.getMultiplexed());
}
}

@Test
public void
testReadWriteUnimplementedErrorDuringInitialBeginTransactionRPC_firstReceivesError_secondFallsBackToRegularSessions() {
Expand Down
Loading