Skip to content

chore(spanner): add multiplexed session flag in executor code #3030

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
10 changes: 10 additions & 0 deletions google-cloud-spanner-executor/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/executor/spanner/CloudExecutorImpl</className>
<method>CloudExecutorImpl(boolean)</method>
<to>CloudExecutorImpl(boolean, double)</to>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerExceptionFactory;
import com.google.spanner.executor.v1.SessionPoolOptions;
import com.google.spanner.executor.v1.SpannerAction;
import com.google.spanner.executor.v1.SpannerAsyncActionRequest;
import com.google.spanner.executor.v1.SpannerAsyncActionResponse;
import com.google.spanner.executor.v1.SpannerExecutorProxyGrpc;
import com.google.spanner.executor.v1.SpannerOptions;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.logging.Level;
Expand All @@ -34,8 +37,13 @@ public class CloudExecutorImpl extends SpannerExecutorProxyGrpc.SpannerExecutorP
// Executors to proxy.
private final CloudClientExecutor clientExecutor;

public CloudExecutorImpl(boolean enableGrpcFaultInjector) {
// Ratio of operations to use multiplexed sessions.
private final double multiplexedSessionOperationsRatio;

public CloudExecutorImpl(
boolean enableGrpcFaultInjector, double multiplexedSessionOperationsRatio) {
clientExecutor = new CloudClientExecutor(enableGrpcFaultInjector);
this.multiplexedSessionOperationsRatio = multiplexedSessionOperationsRatio;
}

/** Execute SpannerAsync action requests. */
Expand All @@ -48,6 +56,36 @@ public StreamObserver<SpannerAsyncActionRequest> executeActionAsync(
@Override
public void onNext(SpannerAsyncActionRequest request) {
LOGGER.log(Level.INFO, String.format("Receiving request: \n%s", request));

// Use Multiplexed sessions for all supported operations if the
// multiplexedSessionOperationsRatio from command line is > 0.0
if (multiplexedSessionOperationsRatio > 0.0) {
SessionPoolOptions.Builder sessionPoolOptionsBuilder;
if (request.getAction().getSpannerOptions().hasSessionPoolOptions()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a check like request.getAction().hasSpannerOptions() before doing getSpannerOptions. Else this will throw a NullPointerException.

Copy link
Contributor Author

@harshachinta harshachinta Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The autogenerated methods from the proto always returns a default instance of the class and not null. In this case getAction(), getSpannerOptions etc will never be null. Hence, we are safe from not getting an NullPointerException
Reference:

public com.google.spanner.executor.v1.SpannerAction getAction() {
return action_ == null
? com.google.spanner.executor.v1.SpannerAction.getDefaultInstance()
: action_;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, got it.

sessionPoolOptionsBuilder =
request
.getAction()
.getSpannerOptions()
.getSessionPoolOptions()
.toBuilder()
.setUseMultiplexed(true);
} else {
sessionPoolOptionsBuilder = SessionPoolOptions.newBuilder().setUseMultiplexed(true);
}

SpannerOptions.Builder optionsBuilder =
request
.getAction()
.getSpannerOptions()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think getSpannerOptions will throw a NullPointerException since no one is passing it today. Add a handling similar to what you did for SessionPoolOptions above.

Copy link
Contributor Author

@harshachinta harshachinta Apr 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The autogenerated methods from the proto always returns a default instance of the class and not null. In this case getAction(), getSpannerOptions etc will never be null eventhough they are not set in input. Hence, we are safe from not getting an NullPointerException
Reference:

public com.google.spanner.executor.v1.SpannerAction getAction() {
return action_ == null
? com.google.spanner.executor.v1.SpannerAction.getDefaultInstance()
: action_;

.toBuilder()
.setSessionPoolOptions(sessionPoolOptionsBuilder);
SpannerAction.Builder actionBuilder =
request.getAction().toBuilder().setSpannerOptions(optionsBuilder);
request = request.toBuilder().setAction(actionBuilder).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we don't have unit tests, can you please link a before/after log object of this request object before and after your change? This is to ensure we are not mis-constructing the entire input object and avoid bugs like above.

Copy link
Contributor Author

@harshachinta harshachinta Apr 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have added a short log here.

Request object Before:
Screenshot 2024-04-23 at 3 19 16 PM

Request Object after adding multiplexed session
Screenshot 2024-04-23 at 3 24 38 PM

LOGGER.log(
Level.INFO,
String.format("Updated request to set multiplexed session flag: \n%s", request));
}
Status status = clientExecutor.startHandlingRequest(request, executionContext);
if (!status.isOk()) {
LOGGER.log(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,21 @@ public class WorkerProxy {
private static final String OPTION_SERVICE_KEY_FILE = "service_key_file";
private static final String OPTION_USE_PLAIN_TEXT_CHANNEL = "use_plain_text_channel";
private static final String OPTION_ENABLE_GRPC_FAULT_INJECTOR = "enable_grpc_fault_injector";
private static final String OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO =
"multiplexed_session_operations_ratio";

public static int spannerPort = 0;
public static int proxyPort = 0;
public static String cert = "";
public static String serviceKeyFile = "";
public static double multiplexedSessionOperationsRatio = 0.0;
public static boolean usePlainTextChannel = false;
public static boolean enableGrpcFaultInjector = false;

public static CommandLine commandLine;

private static final int MIN_PORT = 0, MAX_PORT = 65535;
private static final double MIN_RATIO = 0.0, MAX_RATIO = 1.0;

public static void main(String[] args) throws Exception {
commandLine = buildOptions(args);
Expand Down Expand Up @@ -95,10 +99,30 @@ public static void main(String[] args) throws Exception {
usePlainTextChannel = commandLine.hasOption(OPTION_USE_PLAIN_TEXT_CHANNEL);
enableGrpcFaultInjector = commandLine.hasOption(OPTION_ENABLE_GRPC_FAULT_INJECTOR);

if (commandLine.hasOption(OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you made sure this works when the value is null? Do we have some manual logs to validate that this is working?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO flag value will never be null. The default value passed from systest is 0.0.

multiplexedSessionOperationsRatio =
Double.parseDouble(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: If value is not null, we can add a validation that it is between 0 and 100. Similar validation is present for ports.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a validation for multiplexed session ratio flag value to be between 0.0 and 1.0

commandLine.getOptionValue(OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO));
LOGGER.log(
Level.INFO,
String.format(
"Multiplexed session ratio from commandline arg: \n%s",
multiplexedSessionOperationsRatio));
if (multiplexedSessionOperationsRatio < MIN_RATIO
|| multiplexedSessionOperationsRatio > MAX_RATIO) {
throw new IllegalArgumentException(
"Spanner multiplexedSessionOperationsRatio must be between "
+ MIN_RATIO
+ " and "
+ MAX_RATIO);
}
}

Server server;
while (true) {
try {
CloudExecutorImpl cloudExecutorImpl = new CloudExecutorImpl(enableGrpcFaultInjector);
CloudExecutorImpl cloudExecutorImpl =
new CloudExecutorImpl(enableGrpcFaultInjector, multiplexedSessionOperationsRatio);
HealthStatusManager healthStatusManager = new HealthStatusManager();
// Set up Cloud server.
server =
Expand Down Expand Up @@ -139,6 +163,11 @@ private static CommandLine buildOptions(String[] args) {
OPTION_ENABLE_GRPC_FAULT_INJECTOR,
false,
"Enable grpc fault injector in cloud client executor.");
options.addOption(
null,
OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO,
true,
"Ratio of operations to use multiplexed sessions.");

CommandLineParser parser = new DefaultParser();
try {
Expand Down