Skip to content

Commit 0dd6fd8

Browse files
committed
chore(x-goog-spanner-request-id): assert expectations in tests for retries + aborts
Updates #3537 chore(x-goog-spanner-request-id): add BeginTransaction+ResumableStreamIterator Plumbs x-goog-spanner-request-id into BeginTransaction and ResumableStreamIterator and for PartitionedDmlTransaction. Updates #3537 Get more tests reveal needs for plumbing Add requestId to PartitionQuery + PartitionRead Propagate requestId into some more SpannerException values Use session.getChannel() and assert for results More debugging + fix up sorting comparator Fix more channelId TODOs Fix up withNthRequest for deterministic checks for BatchCreateSessions More retrofits More updates for tests More validity checks Add debugs to assert behavior of outgoing headers
1 parent 380ea90 commit 0dd6fd8

File tree

7 files changed

+426
-31
lines changed

7 files changed

+426
-31
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ class DatabaseClientImpl implements DatabaseClient {
4848
@VisibleForTesting final MultiplexedSessionDatabaseClient multiplexedSessionDatabaseClient;
4949
@VisibleForTesting final boolean useMultiplexedSessionPartitionedOps;
5050
@VisibleForTesting final boolean useMultiplexedSessionForRW;
51-
private final int dbId;
51+
@VisibleForTesting final int dbId;
5252
private final AtomicInteger nthRequest;
5353
private final Map<String, Integer> clientIdToOrdinalMap;
5454

google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ long executeStreamingPartitionedUpdate(
9393
final Duration remainingTimeout = tryUpdateTimeout(timeout, stopwatch);
9494

9595
try {
96+
System.out.println("\033[31mreqIdPump: " + reqId + "\033[00m");
9697
ServerStream<PartialResultSet> stream =
9798
rpc.executeStreamingPartitionedDml(
9899
request, reqId.withOptions(session.getOptions()), remainingTimeout);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ private void backoffSleep(Context context, long backoffMillis) throws SpannerExc
198198

199199
public void ensureNonNullXGoogRequestId() {
200200
if (this.xGoogRequestId == null) {
201+
System.out.println(
202+
"\033[34mXGoogRequestId.ensureNonNull: "
203+
+ this.xGoogRequestId
204+
+ " for:: "
205+
+ System.identityHashCode(this)
206+
+ "\033[00m");
201207
this.xGoogRequestId =
202208
this.xGoogRequestIdCreator.nextRequestId(1 /*TODO: infer channelId*/, 1 /*attempt*/);
203209
}
@@ -206,6 +212,11 @@ public void ensureNonNullXGoogRequestId() {
206212
public void incrementXGoogRequestIdAttempt() {
207213
this.ensureNonNullXGoogRequestId();
208214
this.xGoogRequestId.incrementAttempt();
215+
System.out.println(
216+
"\033[35mincrementXGoogAttempt: "
217+
+ this.xGoogRequestId
218+
+ " :: "
219+
+ System.identityHashCode(this));
209220
}
210221

211222
private enum DirectExecutor implements Executor {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ interface SessionConsumer {
190190
// SessionClient is created long before a DatabaseClientImpl is created,
191191
// as batch sessions are firstly created then later attached to each Client.
192192
private static final AtomicInteger NTH_ID = new AtomicInteger(0);
193-
private final int nthId = NTH_ID.incrementAndGet();
193+
private final int nthId;
194194
private final AtomicInteger nthRequest = new AtomicInteger(0);
195195

196196
@GuardedBy("this")
@@ -205,6 +205,7 @@ interface SessionConsumer {
205205
this.executorFactory = executorFactory;
206206
this.executor = executorFactory.get();
207207
this.commonAttributes = spanner.getTracer().createCommonAttributes(db);
208+
this.nthId = NTH_ID.incrementAndGet();
208209
}
209210

210211
@Override
@@ -223,7 +224,7 @@ DatabaseId getDatabaseId() {
223224
@Override
224225
public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) {
225226
return XGoogSpannerRequestId.of(
226-
this.nthId, this.nthRequest.incrementAndGet(), channelId, attempt);
227+
this.nthId, channelId, this.nthRequest.incrementAndGet(), attempt);
227228
}
228229

229230
/** Create a single session. */
@@ -423,7 +424,7 @@ private List<SessionImpl> internalBatchCreateSessions(
423424
span.addAnnotation(String.format("Requesting %d sessions", sessionCount));
424425
try (IScope s = spanner.getTracer().withSpan(span)) {
425426
XGoogSpannerRequestId reqId =
426-
XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelHint, 1);
427+
XGoogSpannerRequestId.of(this.nthId, channelHint, this.nthRequest.incrementAndGet(), 1);
427428
List<com.google.spanner.v1.Session> sessions =
428429
spanner
429430
.getRpc()

google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2042,7 +2042,8 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20422042
}
20432043
}
20442044
if (options != null) {
2045-
context = withRequestId(context, options);
2045+
// TODO(@odeke-em): Infer the affinity if it doesn't match up with in the request-id.
2046+
context = withRequestId(context, options, method);
20462047
}
20472048
context = context.withExtraHeaders(metadataProvider.newExtraHeaders(resource, projectName));
20482049
if (routeToLeader && leaderAwareRoutingEnabled) {
@@ -2060,15 +2061,40 @@ <ReqT, RespT> GrpcCallContext newCallContext(
20602061
if (configurator != null) {
20612062
apiCallContextFromContext = configurator.configure(context, request, method);
20622063
}
2064+
2065+
// Debug the call headers before this.
2066+
Map<String, List<String>> hdrs = context.getExtraHeaders();
2067+
if (method.getFullMethodName().compareTo("google.spanner.v1.Spanner/DeleteSession") != 0) {
2068+
System.out.println(
2069+
"\033[32mextraHeaders going out for " + method.getFullMethodName() + "\033[00m");
2070+
for (Map.Entry<String, List<String>> entry : hdrs.entrySet()) {
2071+
System.out.println(
2072+
"\t\033[36mcall.Key: " + entry.getKey() + ":: " + entry.getValue() + "\033[00m");
2073+
}
2074+
}
20632075
return (GrpcCallContext) context.merge(apiCallContextFromContext);
20642076
}
20652077

2066-
GrpcCallContext withRequestId(GrpcCallContext context, Map<SpannerRpc.Option, ?> options) {
2078+
<ReqT, RespT> GrpcCallContext withRequestId(
2079+
GrpcCallContext context,
2080+
Map<SpannerRpc.Option, ?> options,
2081+
MethodDescriptor<ReqT, RespT> method) {
20672082
XGoogSpannerRequestId reqId = (XGoogSpannerRequestId) options.get(Option.REQUEST_ID);
20682083
if (reqId == null) {
20692084
return context;
20702085
}
20712086

2087+
String methodName = method.getFullMethodName();
2088+
if (methodName.compareTo("google.spanner.v1.Spanner/ExecuteStreamingSql") == 0) {
2089+
System.out.println(
2090+
"\033[36mGapiSpannerRpc.withRequestId: "
2091+
+ reqId
2092+
+ " for: "
2093+
+ method.getFullMethodName()
2094+
+ " "
2095+
+ System.identityHashCode(context)
2096+
+ "\033[00m");
2097+
}
20722098
Map<String, List<String>> withReqId =
20732099
ImmutableMap.of(
20742100
XGoogSpannerRequestId.REQUEST_HEADER_KEY.name(),

0 commit comments

Comments
 (0)