Skip to content

Commit 1d91283

Browse files
chore(spanner): add multiplexed session flag in executor code (#3030)
* chore(spanner): add multiplexed session flag for executor code * chore(spanner): clirr fix * chore(spanner): refactor doublt to boolean flag * chore(spanner): clirr fix * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * chore(spanner): code refactoring * chore(spanner): add validation for ratio flag * chore(spanner): add logs --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 6f522dd commit 1d91283

File tree

3 files changed

+79
-2
lines changed

3 files changed

+79
-2
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
3+
<differences>
4+
<difference>
5+
<differenceType>7004</differenceType>
6+
<className>com/google/cloud/executor/spanner/CloudExecutorImpl</className>
7+
<method>CloudExecutorImpl(boolean)</method>
8+
<to>CloudExecutorImpl(boolean, double)</to>
9+
</difference>
10+
</differences>

google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/CloudExecutorImpl.java

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818

1919
import com.google.cloud.spanner.ErrorCode;
2020
import com.google.cloud.spanner.SpannerExceptionFactory;
21+
import com.google.spanner.executor.v1.SessionPoolOptions;
22+
import com.google.spanner.executor.v1.SpannerAction;
2123
import com.google.spanner.executor.v1.SpannerAsyncActionRequest;
2224
import com.google.spanner.executor.v1.SpannerAsyncActionResponse;
2325
import com.google.spanner.executor.v1.SpannerExecutorProxyGrpc;
26+
import com.google.spanner.executor.v1.SpannerOptions;
2427
import io.grpc.Status;
2528
import io.grpc.stub.StreamObserver;
2629
import java.util.logging.Level;
@@ -34,8 +37,13 @@ public class CloudExecutorImpl extends SpannerExecutorProxyGrpc.SpannerExecutorP
3437
// Executors to proxy.
3538
private final CloudClientExecutor clientExecutor;
3639

37-
public CloudExecutorImpl(boolean enableGrpcFaultInjector) {
40+
// Ratio of operations to use multiplexed sessions.
41+
private final double multiplexedSessionOperationsRatio;
42+
43+
public CloudExecutorImpl(
44+
boolean enableGrpcFaultInjector, double multiplexedSessionOperationsRatio) {
3845
clientExecutor = new CloudClientExecutor(enableGrpcFaultInjector);
46+
this.multiplexedSessionOperationsRatio = multiplexedSessionOperationsRatio;
3947
}
4048

4149
/** Execute SpannerAsync action requests. */
@@ -48,6 +56,36 @@ public StreamObserver<SpannerAsyncActionRequest> executeActionAsync(
4856
@Override
4957
public void onNext(SpannerAsyncActionRequest request) {
5058
LOGGER.log(Level.INFO, String.format("Receiving request: \n%s", request));
59+
60+
// Use Multiplexed sessions for all supported operations if the
61+
// multiplexedSessionOperationsRatio from command line is > 0.0
62+
if (multiplexedSessionOperationsRatio > 0.0) {
63+
SessionPoolOptions.Builder sessionPoolOptionsBuilder;
64+
if (request.getAction().getSpannerOptions().hasSessionPoolOptions()) {
65+
sessionPoolOptionsBuilder =
66+
request
67+
.getAction()
68+
.getSpannerOptions()
69+
.getSessionPoolOptions()
70+
.toBuilder()
71+
.setUseMultiplexed(true);
72+
} else {
73+
sessionPoolOptionsBuilder = SessionPoolOptions.newBuilder().setUseMultiplexed(true);
74+
}
75+
76+
SpannerOptions.Builder optionsBuilder =
77+
request
78+
.getAction()
79+
.getSpannerOptions()
80+
.toBuilder()
81+
.setSessionPoolOptions(sessionPoolOptionsBuilder);
82+
SpannerAction.Builder actionBuilder =
83+
request.getAction().toBuilder().setSpannerOptions(optionsBuilder);
84+
request = request.toBuilder().setAction(actionBuilder).build();
85+
LOGGER.log(
86+
Level.INFO,
87+
String.format("Updated request to set multiplexed session flag: \n%s", request));
88+
}
5189
Status status = clientExecutor.startHandlingRequest(request, executionContext);
5290
if (!status.isOk()) {
5391
LOGGER.log(

google-cloud-spanner-executor/src/main/java/com/google/cloud/executor/spanner/WorkerProxy.java

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,21 @@ public class WorkerProxy {
4545
private static final String OPTION_SERVICE_KEY_FILE = "service_key_file";
4646
private static final String OPTION_USE_PLAIN_TEXT_CHANNEL = "use_plain_text_channel";
4747
private static final String OPTION_ENABLE_GRPC_FAULT_INJECTOR = "enable_grpc_fault_injector";
48+
private static final String OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO =
49+
"multiplexed_session_operations_ratio";
4850

4951
public static int spannerPort = 0;
5052
public static int proxyPort = 0;
5153
public static String cert = "";
5254
public static String serviceKeyFile = "";
55+
public static double multiplexedSessionOperationsRatio = 0.0;
5356
public static boolean usePlainTextChannel = false;
5457
public static boolean enableGrpcFaultInjector = false;
5558

5659
public static CommandLine commandLine;
5760

5861
private static final int MIN_PORT = 0, MAX_PORT = 65535;
62+
private static final double MIN_RATIO = 0.0, MAX_RATIO = 1.0;
5963

6064
public static void main(String[] args) throws Exception {
6165
commandLine = buildOptions(args);
@@ -95,10 +99,30 @@ public static void main(String[] args) throws Exception {
9599
usePlainTextChannel = commandLine.hasOption(OPTION_USE_PLAIN_TEXT_CHANNEL);
96100
enableGrpcFaultInjector = commandLine.hasOption(OPTION_ENABLE_GRPC_FAULT_INJECTOR);
97101

102+
if (commandLine.hasOption(OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO)) {
103+
multiplexedSessionOperationsRatio =
104+
Double.parseDouble(
105+
commandLine.getOptionValue(OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO));
106+
LOGGER.log(
107+
Level.INFO,
108+
String.format(
109+
"Multiplexed session ratio from commandline arg: \n%s",
110+
multiplexedSessionOperationsRatio));
111+
if (multiplexedSessionOperationsRatio < MIN_RATIO
112+
|| multiplexedSessionOperationsRatio > MAX_RATIO) {
113+
throw new IllegalArgumentException(
114+
"Spanner multiplexedSessionOperationsRatio must be between "
115+
+ MIN_RATIO
116+
+ " and "
117+
+ MAX_RATIO);
118+
}
119+
}
120+
98121
Server server;
99122
while (true) {
100123
try {
101-
CloudExecutorImpl cloudExecutorImpl = new CloudExecutorImpl(enableGrpcFaultInjector);
124+
CloudExecutorImpl cloudExecutorImpl =
125+
new CloudExecutorImpl(enableGrpcFaultInjector, multiplexedSessionOperationsRatio);
102126
HealthStatusManager healthStatusManager = new HealthStatusManager();
103127
// Set up Cloud server.
104128
server =
@@ -139,6 +163,11 @@ private static CommandLine buildOptions(String[] args) {
139163
OPTION_ENABLE_GRPC_FAULT_INJECTOR,
140164
false,
141165
"Enable grpc fault injector in cloud client executor.");
166+
options.addOption(
167+
null,
168+
OPTION_MULTIPLEXED_SESSION_OPERATIONS_RATIO,
169+
true,
170+
"Ratio of operations to use multiplexed sessions.");
142171

143172
CommandLineParser parser = new DefaultParser();
144173
try {

0 commit comments

Comments
 (0)