Skip to content

Commit 670f30d

Browse files
committed
More debugging + fix up sorting comparator
1 parent 8f3e7ba commit 670f30d

File tree

7 files changed

+84
-50
lines changed

7 files changed

+84
-50
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -458,24 +458,29 @@ void initTransaction() {
458458

459459
private void initTransactionInternal(BeginTransactionRequest request) {
460460
XGoogSpannerRequestId reqId =
461-
session.getRequestIdCreator().nextRequestId(1 /*TODO: retrieve channelId*/, 1);
461+
session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
462462
try {
463463
Transaction transaction =
464464
rpc.beginTransaction(
465465
request, reqId.withOptions(getTransactionChannelHint()), isRouteToLeader());
466466
if (!transaction.hasReadTimestamp()) {
467467
throw SpannerExceptionFactory.newSpannerException(
468-
ErrorCode.INTERNAL, "Missing expected transaction.read_timestamp metadata field");
468+
ErrorCode.INTERNAL,
469+
"Missing expected transaction.read_timestamp metadata field",
470+
reqId);
469471
}
470472
if (transaction.getId().isEmpty()) {
471473
throw SpannerExceptionFactory.newSpannerException(
472-
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field");
474+
ErrorCode.INTERNAL, "Missing expected transaction.id metadata field", reqId);
473475
}
474476
try {
475477
timestamp = Timestamp.fromProto(transaction.getReadTimestamp());
476478
} catch (IllegalArgumentException e) {
477479
throw SpannerExceptionFactory.newSpannerException(
478-
ErrorCode.INTERNAL, "Bad value in transaction.read_timestamp metadata field", e);
480+
ErrorCode.INTERNAL,
481+
"Bad value in transaction.read_timestamp metadata field",
482+
e,
483+
reqId);
479484
}
480485
transactionId = transaction.getId();
481486
span.addAnnotation(

google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,14 +82,13 @@ long executeStreamingPartitionedUpdate(
8282
Options options = Options.fromUpdateOptions(updateOptions);
8383
XGoogSpannerRequestId reqId = options.reqId();
8484
if (reqId == null) {
85-
reqId = session.getRequestIdCreator().nextRequestId(session.getChannel(), 0);
85+
reqId = session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
8686
}
8787

8888
try {
8989
ExecuteSqlRequest request = newTransactionRequestFrom(statement, options);
9090

9191
while (true) {
92-
reqId.incrementAttempt();
9392
final Duration remainingTimeout = tryUpdateTimeout(timeout, stopwatch);
9493

9594
try {
@@ -110,6 +109,7 @@ long executeStreamingPartitionedUpdate(
110109
} catch (UnavailableException e) {
111110
LOGGER.log(
112111
Level.FINER, "Retrying PartitionedDml transaction after UnavailableException", e);
112+
reqId.incrementAttempt();
113113
request = resumeOrRestartRequest(resumeToken, statement, request, options);
114114
} catch (InternalException e) {
115115
if (!isRetryableInternalErrorPredicate.apply(e)) {
@@ -118,6 +118,7 @@ long executeStreamingPartitionedUpdate(
118118

119119
LOGGER.log(
120120
Level.FINER, "Retrying PartitionedDml transaction after InternalException - EOS", e);
121+
reqId.incrementAttempt();
121122
request = resumeOrRestartRequest(resumeToken, statement, request, options);
122123
} catch (AbortedException e) {
123124
LOGGER.log(Level.FINER, "Retrying PartitionedDml transaction after AbortedException", e);
@@ -126,7 +127,7 @@ long executeStreamingPartitionedUpdate(
126127
updateCount = 0L;
127128
request = newTransactionRequestFrom(statement, options);
128129
// Create a new xGoogSpannerRequestId.
129-
reqId = session.getRequestIdCreator().nextRequestId(1 /*TODO: infer channelId*/, 0);
130+
reqId = session.getRequestIdCreator().nextRequestId(session.getChannel(), 1);
130131
} catch (SpannerException e) {
131132
e.setRequestId(reqId);
132133
throw e;
@@ -141,7 +142,7 @@ long executeStreamingPartitionedUpdate(
141142
LOGGER.log(Level.FINER, "Finished PartitionedUpdate statement");
142143
return updateCount;
143144
} catch (Exception e) {
144-
throw SpannerExceptionFactory.newSpannerException(e);
145+
throw SpannerExceptionFactory.newSpannerException(e, reqId);
145146
}
146147
}
147148

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionClient.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ interface SessionConsumer {
191191
// SessionClient is created long before a DatabaseClientImpl is created,
192192
// as batch sessions are firstly created then later attached to each Client.
193193
private static final AtomicInteger NTH_ID = new AtomicInteger(0);
194-
private final int nthId = NTH_ID.incrementAndGet();
194+
private final int nthId;
195195
private final AtomicInteger nthRequest = new AtomicInteger(0);
196196

197197
@GuardedBy("this")
@@ -206,6 +206,7 @@ interface SessionConsumer {
206206
this.executorFactory = executorFactory;
207207
this.executor = executorFactory.get();
208208
this.commonAttributes = spanner.getTracer().createCommonAttributes(db);
209+
this.nthId = NTH_ID.incrementAndGet();
209210
}
210211

211212
@Override
@@ -223,8 +224,10 @@ DatabaseId getDatabaseId() {
223224

224225
@Override
225226
public XGoogSpannerRequestId nextRequestId(long channelId, int attempt) {
226-
return XGoogSpannerRequestId.of(
227-
this.nthId, this.nthRequest.incrementAndGet(), channelId, attempt);
227+
long nthReq = this.nthRequest.incrementAndGet();
228+
// System.out.println("\033[36mnthRequest.addr: " + System.identityHashCode(this.nthRequest) + "
229+
// value: " + nthReq + "\033[00m");
230+
return XGoogSpannerRequestId.of(this.nthId, nthReq, channelId, attempt);
228231
}
229232

230233
/** Create a single session. */
@@ -424,7 +427,7 @@ private List<SessionImpl> internalBatchCreateSessions(
424427
span.addAnnotation(String.format("Requesting %d sessions", sessionCount));
425428
try (IScope s = spanner.getTracer().withSpan(span)) {
426429
XGoogSpannerRequestId reqId =
427-
XGoogSpannerRequestId.of(this.nthId, this.nthRequest.incrementAndGet(), channelHint, 1);
430+
XGoogSpannerRequestId.of(this.nthId, channelHint, this.nthRequest.incrementAndGet(), 1);
428431
List<com.google.spanner.v1.Session> sessions =
429432
spanner
430433
.getRpc()

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -465,17 +465,15 @@ public AsyncTransactionManagerImpl transactionManagerAsync(TransactionOption...
465465

466466
@Override
467467
public ApiFuture<Empty> asyncClose() {
468-
XGoogSpannerRequestId reqId =
469-
this.getRequestIdCreator().nextRequestId(this.getChannel(), 1);
468+
XGoogSpannerRequestId reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 1);
470469
return spanner.getRpc().asyncDeleteSession(getName(), reqId.withOptions(getOptions()));
471470
}
472471

473472
@Override
474473
public void close() {
475474
ISpan span = tracer.spanBuilder(SpannerImpl.DELETE_SESSION);
476475
try (IScope s = tracer.withSpan(span)) {
477-
XGoogSpannerRequestId reqId =
478-
this.getRequestIdCreator().nextRequestId(this.getChannel(), 1);
476+
XGoogSpannerRequestId reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 1);
479477
spanner.getRpc().deleteSession(getName(), reqId.withOptions(getOptions()));
480478
} catch (RuntimeException e) {
481479
span.setStatus(e);
@@ -506,8 +504,7 @@ ApiFuture<Transaction> beginTransactionAsync(
506504
}
507505
final BeginTransactionRequest request = requestBuilder.build();
508506
final ApiFuture<Transaction> requestFuture;
509-
XGoogSpannerRequestId reqId =
510-
this.getRequestIdCreator().nextRequestId(this.getChannel(), 1);
507+
XGoogSpannerRequestId reqId = this.getRequestIdCreator().nextRequestId(this.getChannel(), 1);
511508
try (IScope ignore = tracer.withSpan(span)) {
512509
requestFuture =
513510
spanner

google-cloud-spanner/src/main/java/com/google/cloud/spanner/XGoogSpannerRequestId.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,12 +106,29 @@ public String toString() {
106106
this.attempt);
107107
}
108108

109+
public String debugToString() {
110+
return String.format(
111+
"%d.%s.nth_client=%d.nth_chan=%d.nth_req=%d.attempt=%d",
112+
XGoogSpannerRequestId.VERSION,
113+
XGoogSpannerRequestId.RAND_PROCESS_ID,
114+
this.nthClientId,
115+
this.nthChannelId,
116+
this.nthRequest,
117+
this.attempt);
118+
}
119+
109120
@VisibleForTesting
110121
boolean isGreaterThan(XGoogSpannerRequestId other) {
111-
return this.nthClientId > other.nthClientId
112-
&& this.nthChannelId > other.nthChannelId
113-
&& this.nthRequest > other.nthRequest
114-
&& this.attempt > other.attempt;
122+
if (this.nthClientId != other.nthClientId) {
123+
return this.nthClientId > other.nthClientId;
124+
}
125+
if (this.nthChannelId != other.nthChannelId) {
126+
return this.nthChannelId > other.nthChannelId;
127+
}
128+
if (this.nthRequest != other.nthRequest) {
129+
return this.nthRequest > other.nthRequest;
130+
}
131+
return this.attempt > other.attempt;
115132
}
116133

117134
@Override

google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2910,20 +2910,32 @@ public void testPartitionedDmlDoesNotTimeout() {
29102910
DatabaseClientImpl dbImpl = ((DatabaseClientImpl) client);
29112911
int channelId = dbImpl.getSession().getChannel();
29122912
int dbId = dbImpl.dbId;
2913+
XGoogSpannerRequestIdTest.MethodAndRequestId[] wantStreamingValues = {
2914+
XGoogSpannerRequestIdTest.ofMethodAndRequestId(
2915+
"google.spanner.v1.Spanner/ExecuteStreamingSql",
2916+
new XGoogSpannerRequestId(dbId, channelId, 1, 1)),
2917+
};
2918+
xGoogReqIdInterceptor.checkExpectedStreamingXGoogRequestIds(wantStreamingValues);
2919+
29132920
XGoogSpannerRequestIdTest.MethodAndRequestId[] wantUnaryValues = {
29142921
XGoogSpannerRequestIdTest.ofMethodAndRequestId(
29152922
"google.spanner.v1.Spanner/BatchCreateSessions",
2916-
new XGoogSpannerRequestId(1, dbId, channelId, 1)),
2923+
new XGoogSpannerRequestId(dbId, 0, 4, 1)),
29172924
XGoogSpannerRequestIdTest.ofMethodAndRequestId(
29182925
"google.spanner.v1.Spanner/BatchCreateSessions",
2919-
new XGoogSpannerRequestId(1, dbId, channelId, 1)),
2920-
};
2921-
XGoogSpannerRequestIdTest.MethodAndRequestId[] wantStreamingValues = {
2926+
new XGoogSpannerRequestId(dbId, 1, 2, 1)),
29222927
XGoogSpannerRequestIdTest.ofMethodAndRequestId(
2923-
"google.spanner.v1.Spanner/ExecuteStreamingSql",
2924-
new XGoogSpannerRequestId(1, channelId, dbId, 2)),
2928+
"google.spanner.v1.Spanner/BatchCreateSessions",
2929+
new XGoogSpannerRequestId(dbId, 2, 3, 1)),
2930+
XGoogSpannerRequestIdTest.ofMethodAndRequestId(
2931+
"google.spanner.v1.Spanner/BatchCreateSessions",
2932+
new XGoogSpannerRequestId(dbId, 3, 1, 1)),
2933+
XGoogSpannerRequestIdTest.ofMethodAndRequestId(
2934+
"google.spanner.v1.Spanner/BeginTransaction", new XGoogSpannerRequestId(dbId, 3, 1, 1)),
2935+
XGoogSpannerRequestIdTest.ofMethodAndRequestId(
2936+
"google.spanner.v1.Spanner/ExecuteSql",
2937+
new XGoogSpannerRequestId(dbId, channelId, 5, 1)),
29252938
};
2926-
xGoogReqIdInterceptor.checkExpectedStreamingXGoogRequestIds(wantStreamingValues);
29272939
xGoogReqIdInterceptor.checkExpectedUnaryXGoogRequestIds(wantUnaryValues);
29282940
}
29292941
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/XGoogSpannerRequestIdTest.java

Lines changed: 20 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -183,9 +183,6 @@ public MethodAndRequestId[] accumulatedStreamingValues() {
183183

184184
public void checkExpectedUnaryXGoogRequestIds(MethodAndRequestId... wantUnaryValues) {
185185
MethodAndRequestId[] gotUnaryValues = this.accumulatedUnaryValues();
186-
for (int i = 0; i < gotUnaryValues.length; i++) {
187-
System.out.println("\033[34misUnary: #" + i + ":: " + gotUnaryValues[i] + "\033[00m");
188-
}
189186
sortValues(gotUnaryValues);
190187
for (int i = 0; i < gotUnaryValues.length; i++) {
191188
System.out.println("\033[33misUnary: #" + i + ":: " + gotUnaryValues[i] + "\033[00m");
@@ -194,14 +191,15 @@ public void checkExpectedUnaryXGoogRequestIds(MethodAndRequestId... wantUnaryVal
194191
}
195192

196193
private void sortValues(MethodAndRequestId[] values) {
197-
Arrays.sort(values, new MethodAndRequestIdComparator());
194+
Arrays.sort(values, new MethodAndRequestIdComparator());
198195
}
199196

200197
public void checkExpectedStreamingXGoogRequestIds(MethodAndRequestId... wantStreamingValues) {
201198
MethodAndRequestId[] gotStreamingValues = this.accumulatedStreamingValues();
202199
sortValues(gotStreamingValues);
203200
for (int i = 0; i < gotStreamingValues.length; i++) {
204-
System.out.println("\033[32misStreaming: #" + i + ":: " + gotStreamingValues[i] + "\033[00m");
201+
System.out.println(
202+
"\033[32misStreaming: #" + i + ":: " + gotStreamingValues[i] + "\033[00m");
205203
}
206204
assertEquals(wantStreamingValues, gotStreamingValues);
207205
}
@@ -223,34 +221,35 @@ public MethodAndRequestId(String method, XGoogSpannerRequestId requestId) {
223221
}
224222

225223
public String toString() {
226-
return "{" + this.method + ":" + this.requestId.toString() + "}";
224+
return "{" + this.method + ":" + this.requestId.debugToString() + "}";
227225
}
228226

229227
@Override
230228
public boolean equals(Object o) {
231229
if (!(o instanceof MethodAndRequestId)) {
232-
return false;
230+
return false;
233231
}
234232
MethodAndRequestId other = (MethodAndRequestId) o;
235-
return Objects.equals(this.method, other.method) && Objects.equals(this.requestId, other.requestId);
233+
return Objects.equals(this.method, other.method)
234+
&& Objects.equals(this.requestId, other.requestId);
236235
}
237236
}
238237

239238
static class MethodAndRequestIdComparator implements Comparator<MethodAndRequestId> {
240-
@Override
241-
public int compare(MethodAndRequestId mr1, MethodAndRequestId mr2) {
242-
int cmpMethod = mr1.method.compareTo(mr2.method);
243-
if (cmpMethod != 0) {
244-
return cmpMethod;
245-
}
246-
if (Objects.equals(mr1.requestId, mr2.requestId)) {
247-
return 0;
248-
}
249-
if (mr1.requestId.isGreaterThan(mr2.requestId)) {
250-
return +1;
251-
}
252-
return -1;
239+
@Override
240+
public int compare(MethodAndRequestId mr1, MethodAndRequestId mr2) {
241+
int cmpMethod = mr1.method.compareTo(mr2.method);
242+
if (cmpMethod != 0) {
243+
return cmpMethod;
253244
}
245+
if (Objects.equals(mr1.requestId, mr2.requestId)) {
246+
return 0;
247+
}
248+
if (mr1.requestId.isGreaterThan(mr2.requestId)) {
249+
return +1;
250+
}
251+
return -1;
252+
}
254253
}
255254

256255
public static MethodAndRequestId ofMethodAndRequestId(String method, String reqId) {

0 commit comments

Comments
 (0)