Skip to content

chore: create and reuse a base CallContext #3816

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 15, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,8 @@ public class GapicSpannerRpc implements SpannerRpc {

private Supplier<Boolean> directPathEnabledSupplier = () -> false;

private final GrpcCallContext baseGrpcCallContext;

public static GapicSpannerRpc create(SpannerOptions options) {
return new GapicSpannerRpc(options);
}
Expand Down Expand Up @@ -333,6 +335,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
this.endToEndTracingEnabled = options.isEndToEndTracingEnabled();
this.numChannels = options.getNumChannels();
this.isGrpcGcpExtensionEnabled = options.isGrpcGcpExtensionEnabled();
this.baseGrpcCallContext = createBaseCallContext();

if (initializeStubs) {
// First check if SpannerOptions provides a TransportChannelProvider. Create one
Expand Down Expand Up @@ -1978,6 +1981,20 @@ private static <T> T get(final Future<T> future) throws SpannerException {
}
}

private GrpcCallContext createBaseCallContext() {
GrpcCallContext context = GrpcCallContext.createDefault();
if (compressorName != null) {
// This sets the compressor for Client -> Server.
context = context.withCallOptions(context.getCallOptions().withCompression(compressorName));
}
if (endToEndTracingEnabled) {
context = context.withExtraHeaders(metadataProvider.newEndToEndTracingHeader());
}
return context
.withStreamWaitTimeoutDuration(waitTimeout)
.withStreamIdleTimeoutDuration(idleTimeout);
}

// Before removing this method, please verify with a code owner that it is not used
// in any internal testing infrastructure.
@VisibleForTesting
Expand All @@ -2002,7 +2019,7 @@ <ReqT, RespT> GrpcCallContext newCallContext(
ReqT request,
MethodDescriptor<ReqT, RespT> method,
boolean routeToLeader) {
GrpcCallContext context = GrpcCallContext.createDefault();
GrpcCallContext context = this.baseGrpcCallContext;
if (options != null) {
if (this.isGrpcGcpExtensionEnabled) {
// Set channel affinity in gRPC-GCP.
Expand All @@ -2019,28 +2036,17 @@ <ReqT, RespT> GrpcCallContext newCallContext(
context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue());
}
}
if (compressorName != null) {
// This sets the compressor for Client -> Server.
context = context.withCallOptions(context.getCallOptions().withCompression(compressorName));
}
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
if (routeToLeader && leaderAwareRoutingEnabled) {
context = context.withExtraHeaders(metadataProvider.newRouteToLeaderHeader());
}
if (endToEndTracingEnabled) {
context = context.withExtraHeaders(metadataProvider.newEndToEndTracingHeader());
}
if (callCredentialsProvider != null) {
CallCredentials callCredentials = callCredentialsProvider.getCallCredentials();
if (callCredentials != null) {
context =
context.withCallOptions(context.getCallOptions().withCallCredentials(callCredentials));
}
}
context =
context
.withStreamWaitTimeoutDuration(waitTimeout)
.withStreamIdleTimeoutDuration(idleTimeout);
CallContextConfigurator configurator = SpannerOptions.CALL_CONTEXT_CONFIGURATOR_KEY.get();
ApiCallContext apiCallContextFromContext = null;
if (configurator != null) {
Expand Down
Loading