-
Notifications
You must be signed in to change notification settings - Fork 132
chore(spanner): add multiplexed session flag in executor code #3030
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
9321d83
121f4e0
82fbc76
df425b8
f37f8c8
9aaf1b2
b3d94ba
d62fc41
d75e53b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html --> | ||
<differences> | ||
<difference> | ||
<differenceType>7004</differenceType> | ||
<className>com/google/cloud/executor/spanner/CloudExecutorImpl</className> | ||
<method>CloudExecutorImpl(boolean)</method> | ||
<to>CloudExecutorImpl(boolean, double)</to> | ||
</difference> | ||
</differences> |
Original file line number | Diff line number | Diff line change | ||||||||
---|---|---|---|---|---|---|---|---|---|---|
|
@@ -18,9 +18,12 @@ | |||||||||
|
||||||||||
import com.google.cloud.spanner.ErrorCode; | ||||||||||
import com.google.cloud.spanner.SpannerExceptionFactory; | ||||||||||
import com.google.spanner.executor.v1.SessionPoolOptions; | ||||||||||
import com.google.spanner.executor.v1.SpannerAction; | ||||||||||
import com.google.spanner.executor.v1.SpannerAsyncActionRequest; | ||||||||||
import com.google.spanner.executor.v1.SpannerAsyncActionResponse; | ||||||||||
import com.google.spanner.executor.v1.SpannerExecutorProxyGrpc; | ||||||||||
import com.google.spanner.executor.v1.SpannerOptions; | ||||||||||
import io.grpc.Status; | ||||||||||
import io.grpc.stub.StreamObserver; | ||||||||||
import java.util.logging.Level; | ||||||||||
|
@@ -34,8 +37,13 @@ public class CloudExecutorImpl extends SpannerExecutorProxyGrpc.SpannerExecutorP | |||||||||
// Executors to proxy. | ||||||||||
private final CloudClientExecutor clientExecutor; | ||||||||||
|
||||||||||
public CloudExecutorImpl(boolean enableGrpcFaultInjector) { | ||||||||||
// Ratio of operations to use multiplexed sessions. | ||||||||||
private final double multiplexedSessionOperationsRatio; | ||||||||||
|
||||||||||
public CloudExecutorImpl( | ||||||||||
boolean enableGrpcFaultInjector, double multiplexedSessionOperationsRatio) { | ||||||||||
clientExecutor = new CloudClientExecutor(enableGrpcFaultInjector); | ||||||||||
this.multiplexedSessionOperationsRatio = multiplexedSessionOperationsRatio; | ||||||||||
} | ||||||||||
|
||||||||||
/** Execute SpannerAsync action requests. */ | ||||||||||
|
@@ -48,6 +56,36 @@ public StreamObserver<SpannerAsyncActionRequest> executeActionAsync( | |||||||||
@Override | ||||||||||
public void onNext(SpannerAsyncActionRequest request) { | ||||||||||
LOGGER.log(Level.INFO, String.format("Receiving request: \n%s", request)); | ||||||||||
|
||||||||||
// Use Multiplexed sessions for all supported operations if the | ||||||||||
// multiplexedSessionOperationsRatio from command line is > 0.0 | ||||||||||
if (multiplexedSessionOperationsRatio > 0.0) { | ||||||||||
SessionPoolOptions.Builder sessionPoolOptionsBuilder; | ||||||||||
if (request.getAction().getSpannerOptions().hasSessionPoolOptions()) { | ||||||||||
sessionPoolOptionsBuilder = | ||||||||||
request | ||||||||||
.getAction() | ||||||||||
.getSpannerOptions() | ||||||||||
.getSessionPoolOptions() | ||||||||||
.toBuilder() | ||||||||||
.setUseMultiplexed(true); | ||||||||||
} else { | ||||||||||
sessionPoolOptionsBuilder = SessionPoolOptions.newBuilder().setUseMultiplexed(true); | ||||||||||
} | ||||||||||
|
||||||||||
SpannerOptions.Builder optionsBuilder = | ||||||||||
request | ||||||||||
.getAction() | ||||||||||
.getSpannerOptions() | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The autogenerated methods from the proto always returns a default instance of the class and not Lines 112 to 115 in 6f522dd
|
||||||||||
.toBuilder() | ||||||||||
.setSessionPoolOptions(sessionPoolOptionsBuilder); | ||||||||||
SpannerAction.Builder actionBuilder = | ||||||||||
request.getAction().toBuilder().setSpannerOptions(optionsBuilder); | ||||||||||
request = request.toBuilder().setAction(actionBuilder).build(); | ||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we don't have unit tests, can you please link a before/after log object of this request object before and after your change? This is to ensure we are not mis-constructing the entire input object and avoid bugs like above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have added a short log here. |
||||||||||
LOGGER.log( | ||||||||||
Level.INFO, | ||||||||||
String.format("Updated request to set multiplexed session flag: \n%s", request)); | ||||||||||
} | ||||||||||
Status status = clientExecutor.startHandlingRequest(request, executionContext); | ||||||||||
if (!status.isOk()) { | ||||||||||
LOGGER.log( | ||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,17 +45,21 @@ public class WorkerProxy { | |
private static final String OPTION_SERVICE_KEY_FILE = "service_key_file"; | ||
private static final String OPTION_USE_PLAIN_TEXT_CHANNEL = "use_plain_text_channel"; | ||
private static final String OPTION_ENABLE_GRPC_FAULT_INJECTOR = "enable_grpc_fault_injector"; | ||
private static final String OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO = | ||
"multiplexed_session_operations_ratio"; | ||
|
||
public static int spannerPort = 0; | ||
public static int proxyPort = 0; | ||
public static String cert = ""; | ||
public static String serviceKeyFile = ""; | ||
public static double multiplexedSessionOperationsRatio = 0.0; | ||
harshachinta marked this conversation as resolved.
Show resolved
Hide resolved
|
||
public static boolean usePlainTextChannel = false; | ||
public static boolean enableGrpcFaultInjector = false; | ||
|
||
public static CommandLine commandLine; | ||
|
||
private static final int MIN_PORT = 0, MAX_PORT = 65535; | ||
private static final double MIN_RATIO = 0.0, MAX_RATIO = 1.0; | ||
|
||
public static void main(String[] args) throws Exception { | ||
commandLine = buildOptions(args); | ||
|
@@ -95,10 +99,30 @@ public static void main(String[] args) throws Exception { | |
usePlainTextChannel = commandLine.hasOption(OPTION_USE_PLAIN_TEXT_CHANNEL); | ||
enableGrpcFaultInjector = commandLine.hasOption(OPTION_ENABLE_GRPC_FAULT_INJECTOR); | ||
|
||
if (commandLine.hasOption(OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Have you made sure this works when the value is null? Do we have some manual logs to validate that this is working? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
multiplexedSessionOperationsRatio = | ||
Double.parseDouble( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: If value is not null, we can add a validation that it is between 0 and 100. Similar validation is present for ports. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added a validation for multiplexed session ratio flag value to be between |
||
commandLine.getOptionValue(OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO)); | ||
LOGGER.log( | ||
Level.INFO, | ||
String.format( | ||
"Multiplexed session ratio from commandline arg: \n%s", | ||
multiplexedSessionOperationsRatio)); | ||
if (multiplexedSessionOperationsRatio < MIN_RATIO | ||
|| multiplexedSessionOperationsRatio > MAX_RATIO) { | ||
throw new IllegalArgumentException( | ||
"Spanner multiplexedSessionOperationsRatio must be between " | ||
+ MIN_RATIO | ||
+ " and " | ||
+ MAX_RATIO); | ||
} | ||
} | ||
|
||
Server server; | ||
while (true) { | ||
try { | ||
CloudExecutorImpl cloudExecutorImpl = new CloudExecutorImpl(enableGrpcFaultInjector); | ||
CloudExecutorImpl cloudExecutorImpl = | ||
new CloudExecutorImpl(enableGrpcFaultInjector, multiplexedSessionOperationsRatio); | ||
HealthStatusManager healthStatusManager = new HealthStatusManager(); | ||
// Set up Cloud server. | ||
server = | ||
|
@@ -139,6 +163,11 @@ private static CommandLine buildOptions(String[] args) { | |
OPTION_ENABLE_GRPC_FAULT_INJECTOR, | ||
false, | ||
"Enable grpc fault injector in cloud client executor."); | ||
options.addOption( | ||
null, | ||
OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO, | ||
true, | ||
"Ratio of operations to use multiplexed sessions."); | ||
|
||
CommandLineParser parser = new DefaultParser(); | ||
try { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a check like
request.getAction().hasSpannerOptions()
before doinggetSpannerOptions
. Else this will throw a NullPointerException.Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The autogenerated methods from the proto always returns a default instance of the class and not
null
. In this casegetAction()
,getSpannerOptions
etc will never benull
. Hence, we are safe from not getting anNullPointerException
Reference:
java-spanner/proto-google-cloud-spanner-executor-v1/src/main/java/com/google/spanner/executor/v1/SpannerAsyncActionRequest.java
Lines 112 to 115 in 6f522dd
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, got it.