Skip to content

chore: fallback for Partitioned Operations with multiplexed session #3710

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,32 @@ ByteString getTransactionId() {
}
}

/**
* Initializes the transaction with the timestamp specified within MultiUseReadOnlyTransaction.
* This is used only for fallback of PartitionQueryRequest and PartitionReadRequest with
* Multiplexed Session.
*/
void initFallbackTransaction() {
synchronized (txnLock) {
span.addAnnotation("Creating Transaction");
TransactionOptions.Builder options = TransactionOptions.newBuilder();
if (timestamp != null) {
options
.getReadOnlyBuilder()
.setReadTimestamp(timestamp.toProto())
.setReturnReadTimestamp(true);
} else {
bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true);
}
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(session.getName())
.setOptions(options)
.build();
initTransactionInternal(request);
}
}

void initTransaction() {
SessionImpl.throwIfTransactionsPending();

Expand All @@ -419,40 +445,43 @@ void initTransaction() {
return;
}
span.addAnnotation("Creating Transaction");
TransactionOptions.Builder options = TransactionOptions.newBuilder();
bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true);
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(session.getName())
.setOptions(options)
.build();
initTransactionInternal(request);
}
}

private void initTransactionInternal(BeginTransactionRequest request) {
try {
Transaction transaction =
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
if (!transaction.hasReadTimestamp()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
}
if (transaction.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field");
}
try {
TransactionOptions.Builder options = TransactionOptions.newBuilder();
bound.applyToBuilder(options.getReadOnlyBuilder()).setReturnReadTimestamp(true);
final BeginTransactionRequest request =
BeginTransactionRequest.newBuilder()
.setSession(session.getName())
.setOptions(options)
.build();
Transaction transaction =
rpc.beginTransaction(request, getTransactionChannelHint(), isRouteToLeader());
if (!transaction.hasReadTimestamp()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
}
if (transaction.getId().isEmpty()) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field");
}
try {
timestamp = Timestamp.fromProto(transaction.getReadTimestamp());
} catch (IllegalArgumentException e) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
}
transactionId = transaction.getId();
span.addAnnotation(
"Transaction Creation Done",
ImmutableMap.of(
"Id", transaction.getId().toStringUtf8(), "Timestamp", timestamp.toString()));

} catch (SpannerException e) {
span.addAnnotation("Transaction Creation Failed", e);
throw e;
timestamp = Timestamp.fromProto(transaction.getReadTimestamp());
} catch (IllegalArgumentException e) {
throw SpannerExceptionFactory.newSpannerException(
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
}
transactionId = transaction.getId();
span.addAnnotation(
"Transaction Creation Done",
ImmutableMap.of(
"Id", transaction.getId().toStringUtf8(), "Timestamp", timestamp.toString()));
} catch (SpannerException e) {
span.addAnnotation("Transaction Creation Failed", e);
throw e;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
sessionClient.getSpanner().getOptions().getDirectedReadOptions())
.setSpan(sessionClient.getSpanner().getTracer().getCurrentSpan())
.setTracer(sessionClient.getSpanner().getTracer()),
checkNotNull(bound));
checkNotNull(bound),
sessionClient);
}

@Override
Expand All @@ -137,7 +138,8 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
sessionClient.getSpanner().getOptions().getDirectedReadOptions())
.setSpan(sessionClient.getSpanner().getTracer().getCurrentSpan())
.setTracer(sessionClient.getSpanner().getTracer()),
batchTransactionId);
batchTransactionId,
sessionClient);
}

private boolean canUseMultiplexedSession() {
Expand All @@ -160,20 +162,28 @@ private SessionImpl getMultiplexedSession() {

private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransaction
implements BatchReadOnlyTransaction {
private final String sessionName;
private String sessionName;
private final Map<SpannerRpc.Option, ?> options;
private final SessionClient sessionClient;
private final AtomicBoolean fallbackInitiated = new AtomicBoolean(false);

BatchReadOnlyTransactionImpl(
MultiUseReadOnlyTransaction.Builder builder, TimestampBound bound) {
MultiUseReadOnlyTransaction.Builder builder,
TimestampBound bound,
SessionClient sessionClient) {
super(builder.setTimestampBound(bound));
this.sessionClient = sessionClient;
this.sessionName = session.getName();
this.options = session.getOptions();
initTransaction();
}

BatchReadOnlyTransactionImpl(
MultiUseReadOnlyTransaction.Builder builder, BatchTransactionId batchTransactionId) {
MultiUseReadOnlyTransaction.Builder builder,
BatchTransactionId batchTransactionId,
SessionClient sessionClient) {
super(builder.setTransactionId(batchTransactionId.getTransactionId()));
this.sessionClient = sessionClient;
this.sessionName = session.getName();
this.options = session.getOptions();
}
Expand Down Expand Up @@ -204,6 +214,18 @@ public List<Partition> partitionReadUsingIndex(
Iterable<String> columns,
ReadOption... option)
throws SpannerException {
return partitionReadUsingIndex(partitionOptions, table, index, keys, columns, false, option);
}

private List<Partition> partitionReadUsingIndex(
PartitionOptions partitionOptions,
String table,
String index,
KeySet keys,
Iterable<String> columns,
boolean isFallback,
ReadOption... option)
throws SpannerException {
Options readOptions = Options.fromReadOptions(option);
Preconditions.checkArgument(
!readOptions.hasLimit(),
Expand Down Expand Up @@ -246,7 +268,10 @@ public List<Partition> partitionReadUsingIndex(
}
return partitions.build();
} catch (SpannerException e) {
maybeMarkUnimplementedForPartitionedOps(e);
if (!isFallback && maybeMarkUnimplementedForPartitionedOps(e)) {
return partitionReadUsingIndex(
partitionOptions, table, index, keys, columns, true, option);
}
throw e;
}
}
Expand All @@ -255,6 +280,15 @@ public List<Partition> partitionReadUsingIndex(
public List<Partition> partitionQuery(
PartitionOptions partitionOptions, Statement statement, QueryOption... option)
throws SpannerException {
return partitionQuery(partitionOptions, statement, false, option);
}

private List<Partition> partitionQuery(
PartitionOptions partitionOptions,
Statement statement,
boolean isFallback,
QueryOption... option)
throws SpannerException {
Options queryOptions = Options.fromQueryOptions(option);
final PartitionQueryRequest.Builder builder =
PartitionQueryRequest.newBuilder().setSession(sessionName).setSql(statement.getSql());
Expand Down Expand Up @@ -291,16 +325,29 @@ public List<Partition> partitionQuery(
}
return partitions.build();
} catch (SpannerException e) {
maybeMarkUnimplementedForPartitionedOps(e);
if (!isFallback && maybeMarkUnimplementedForPartitionedOps(e)) {
return partitionQuery(partitionOptions, statement, true, option);
}
throw e;
}
}

void maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
boolean maybeMarkUnimplementedForPartitionedOps(SpannerException spannerException) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should mark this entire method as synchronized. In theory, the partitionRead and partitionQuery methods could be executed in parallel, and if an UNIMPLEMENTED error is returned, we would only want to create the fallback transaction once.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Have synchronized on session.

if (MultiplexedSessionDatabaseClient.verifyErrorMessage(
spannerException, "Partitioned operations are not supported with multiplexed sessions")) {
unimplementedForPartitionedOps.set(true);
synchronized (fallbackInitiated) {
if (!fallbackInitiated.get()) {
session.setFallbackSessionReference(
sessionClient.createSession().getSessionReference());
sessionName = session.getName();
initFallbackTransaction();
unimplementedForPartitionedOps.set(true);
fallbackInitiated.set(true);
}
return true;
}
}
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.spanner;

import com.google.cloud.Timestamp;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import java.io.Serializable;
Expand All @@ -34,6 +35,7 @@ public class BatchTransactionId implements Serializable {
private final Timestamp timestamp;
private static final long serialVersionUID = 8067099123096783939L;

@VisibleForTesting
BatchTransactionId(String sessionId, ByteString transactionId, Timestamp timestamp) {
this.transactionId = Preconditions.checkNotNull(transactionId);
this.sessionId = Preconditions.checkNotNull(sessionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ interface SessionTransaction {
static final int NO_CHANNEL_HINT = -1;

private final SpannerImpl spanner;
private final SessionReference sessionReference;
private SessionReference sessionReference;
private SessionTransaction activeTransaction;
private ISpan currentSpan;
private final Clock clock;
Expand Down Expand Up @@ -160,6 +160,14 @@ public String getName() {
return sessionReference.getName();
}

/**
* Updates the session reference with the fallback session. This should only be used for updating
* session reference with regular session in case of unimplemented error in multiplexed session.
*/
void setFallbackSessionReference(SessionReference sessionReference) {
this.sessionReference = sessionReference;
}

Map<SpannerRpc.Option, ?> getOptions() {
return options;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1554,7 +1554,11 @@ public void testInitialBeginTransactionWithRW_receivesUnimplemented_fallsBackToR
assertTrue(client.multiplexedSessionDatabaseClient.unimplementedForPartitionedOps.get());
}

// Tests the behavior of the server-side kill switch for read-write multiplexed sessions.
/**
* Tests the behavior of the server-side kill switch for partitioned query multiplexed sessions. 2
* PartitionQueryRequest should be received. First with Multiplexed session and second with
* regular session.
*/
@Test
public void testPartitionedQuery_receivesUnimplemented_fallsBackToRegularSession() {
try {
Expand All @@ -1569,40 +1573,81 @@ public void testPartitionedQuery_receivesUnimplemented_fallsBackToRegularSession

try (BatchReadOnlyTransaction transaction =
client.batchReadOnlyTransaction(TimestampBound.strong())) {
// Partitioned Query should fail
SpannerException spannerException =
assertThrows(
SpannerException.class,
() -> {
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);
});
assertEquals(ErrorCode.INVALID_ARGUMENT, spannerException.getErrorCode());
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);

// Verify that we received one PartitionQueryRequest.
List<PartitionQueryRequest> partitionQueryRequests =
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
assertEquals(1, partitionQueryRequests.size());
assertEquals(2, partitionQueryRequests.size());
// Verify the requests were executed using multiplexed sessions
Session session2 = mockSpanner.getSession(partitionQueryRequests.get(0).getSession());
assertNotNull(session2);
assertTrue(session2.getMultiplexed());
Session session = mockSpanner.getSession(partitionQueryRequests.get(0).getSession());
assertNotNull(session);
assertTrue(session.getMultiplexed());
assertTrue(BatchClientImpl.unimplementedForPartitionedOps.get());

session = mockSpanner.getSession(partitionQueryRequests.get(1).getSession());
assertNotNull(session);
assertFalse(session.getMultiplexed());
}
} finally {
BatchClientImpl.unimplementedForPartitionedOps.set(false);
}
}

/**
* Tests the behavior of the server-side kill switch for partitioned query multiplexed sessions.
* The BatchReadOnlyTransaction is initiated using BatchTransactionId. 2 PartitionQueryRequest
* should be received. First with Multiplexed session and second with regular session.
*/
@Test
public void
testPartitionedQueryWithTransactionId_receivesUnimplemented_fallsBackToRegularSession() {
try {
mockSpanner.setPartitionQueryExecutionTime(
SimulatedExecutionTime.ofException(
Status.INVALID_ARGUMENT
.withDescription(
"Partitioned operations are not supported with multiplexed sessions")
.asRuntimeException()));
BatchClientImpl client =
(BatchClientImpl) spanner.getBatchClient(DatabaseId.of("p", "i", "d"));

try (BatchReadOnlyTransaction transaction =
client.batchReadOnlyTransaction(TimestampBound.strong())) {
// Partitioned Query should fail
transaction.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);

// // Verify that we received two PartitionQueryRequest. and it uses a regular session due
// to
// fallback.
List<PartitionQueryRequest> partitionQueryRequests =
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
assertEquals(2, partitionQueryRequests.size());
// Verify the requests are not executed using multiplexed sessions
Session session2 = mockSpanner.getSession(partitionQueryRequests.get(1).getSession());
assertNotNull(session2);
assertFalse(session2.getMultiplexed());
try (BatchReadOnlyTransaction transaction1 =
client.batchReadOnlyTransaction(transaction.getBatchTransactionId())) {
transaction1.partitionQuery(PartitionOptions.getDefaultInstance(), STATEMENT);

// Verify that we received one PartitionQueryRequest.
List<PartitionQueryRequest> partitionQueryRequests =
mockSpanner.getRequestsOfType(PartitionQueryRequest.class);
assertEquals(2, partitionQueryRequests.size());
// Verify the requests were executed using multiplexed sessions
Session session = mockSpanner.getSession(partitionQueryRequests.get(0).getSession());
assertNotNull(session);
assertTrue(session.getMultiplexed());
assertTrue(BatchClientImpl.unimplementedForPartitionedOps.get());

session = mockSpanner.getSession(partitionQueryRequests.get(1).getSession());
assertNotNull(session);
assertFalse(session.getMultiplexed());

List<BeginTransactionRequest> beginTransactionRequests =
mockSpanner.getRequestsOfType(BeginTransactionRequest.class);
assertEquals(2, beginTransactionRequests.size());

session = mockSpanner.getSession(beginTransactionRequests.get(0).getSession());
assertNotNull(session);
assertTrue(session.getMultiplexed());

session = mockSpanner.getSession(beginTransactionRequests.get(1).getSession());
assertNotNull(session);
assertFalse(session.getMultiplexed());
assertEquals(
transaction.getBatchTransactionId().getTimestamp(),
transaction1.getBatchTransactionId().getTimestamp());
}
}
} finally {
BatchClientImpl.unimplementedForPartitionedOps.set(false);
Expand Down
Loading