Skip to content

Commit c74369b

Browse files
authored
chore(spanner): fix code and tests (#3636)
1 parent c8d1e63 commit c74369b

File tree

6 files changed

+37
-70
lines changed

6 files changed

+37
-70
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncTransactionManagerImpl.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void setSpan(ISpan span) {
5555

5656
@Override
5757
public void close() {
58-
closeAsync();
58+
SpannerApiFutures.get(closeAsync());
5959
}
6060

6161
@Override
@@ -183,6 +183,10 @@ public ApiFuture<Void> rollbackAsync() {
183183

184184
@Override
185185
public TransactionContextFuture resetForRetryAsync() {
186+
if (txn == null || !txn.isAborted() && txnState != TransactionState.ABORTED) {
187+
throw new IllegalStateException(
188+
"resetForRetry can only be called if the previous attempt aborted");
189+
}
186190
return new TransactionContextFutureImpl(this, internalBeginAsync(false));
187191
}
188192

google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncTransactionManagerImplTest.java

Lines changed: 0 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,14 @@
1616

1717
package com.google.cloud.spanner;
1818

19-
import static org.junit.Assert.assertThrows;
2019
import static org.mockito.ArgumentMatchers.any;
2120
import static org.mockito.ArgumentMatchers.eq;
22-
import static org.mockito.Mockito.clearInvocations;
23-
import static org.mockito.Mockito.doThrow;
2421
import static org.mockito.Mockito.mock;
2522
import static org.mockito.Mockito.verify;
2623
import static org.mockito.Mockito.when;
2724

2825
import com.google.api.core.ApiFutures;
2926
import com.google.cloud.Timestamp;
30-
import com.google.protobuf.ByteString;
3127
import io.opentelemetry.api.trace.Span;
3228
import io.opentelemetry.context.Scope;
3329
import org.junit.Test;
@@ -60,67 +56,4 @@ public void testCommitReturnsCommitStats() {
6056
verify(transaction).commitAsync();
6157
}
6258
}
63-
64-
@Test
65-
public void testRetryUsesPreviousTransactionIdOnMultiplexedSession() {
66-
// Set up mock transaction IDs
67-
final ByteString mockTransactionId = ByteString.copyFromUtf8("mockTransactionId");
68-
final ByteString mockPreviousTransactionId =
69-
ByteString.copyFromUtf8("mockPreviousTransactionId");
70-
71-
Span oTspan = mock(Span.class);
72-
ISpan span = new OpenTelemetrySpan(oTspan);
73-
when(oTspan.makeCurrent()).thenReturn(mock(Scope.class));
74-
// Mark the session as multiplexed.
75-
when(session.getIsMultiplexed()).thenReturn(true);
76-
77-
// Initialize a mock transaction with transactionId = null, previousTransactionId = null.
78-
transaction = mock(TransactionRunnerImpl.TransactionContextImpl.class);
79-
when(transaction.ensureTxnAsync()).thenReturn(ApiFutures.immediateFuture(null));
80-
when(session.newTransaction(eq(Options.fromTransactionOptions(Options.commitStats())), any()))
81-
.thenReturn(transaction);
82-
83-
// Simulate an ABORTED error being thrown when `commitAsync()` is called.
84-
doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.ABORTED, ""))
85-
.when(transaction)
86-
.commitAsync();
87-
88-
try (AsyncTransactionManagerImpl manager =
89-
new AsyncTransactionManagerImpl(session, span, Options.commitStats())) {
90-
manager.beginAsync();
91-
92-
// Verify that for the first transaction attempt, the `previousTransactionId` is
93-
// ByteString.EMPTY.
94-
// This is because no transaction has been previously aborted at this point.
95-
verify(session)
96-
.newTransaction(Options.fromTransactionOptions(Options.commitStats()), ByteString.EMPTY);
97-
assertThrows(AbortedException.class, manager::commitAsync);
98-
clearInvocations(session);
99-
100-
// Mock the transaction object to contain transactionID=null and
101-
// previousTransactionId=mockPreviousTransactionId
102-
when(transaction.getPreviousTransactionId()).thenReturn(mockPreviousTransactionId);
103-
manager.resetForRetryAsync();
104-
// Verify that in the first retry attempt, the `previousTransactionId`
105-
// (mockPreviousTransactionId) is passed to the new transaction.
106-
// This allows Spanner to retry the transaction using the ID of the aborted transaction.
107-
verify(session)
108-
.newTransaction(
109-
Options.fromTransactionOptions(Options.commitStats()), mockPreviousTransactionId);
110-
assertThrows(AbortedException.class, manager::commitAsync);
111-
clearInvocations(session);
112-
113-
// Mock the transaction object to contain transactionID=mockTransactionId and
114-
// previousTransactionId=mockPreviousTransactionId and transactionID = null
115-
transaction.transactionId = mockTransactionId;
116-
manager.resetForRetryAsync();
117-
// Verify that the latest `transactionId` (mockTransactionId) is used in the retry.
118-
// This ensures the retry logic is working as expected with the latest transaction ID.
119-
verify(session)
120-
.newTransaction(Options.fromTransactionOptions(Options.commitStats()), mockTransactionId);
121-
122-
when(transaction.rollbackAsync()).thenReturn(ApiFutures.immediateFuture(null));
123-
manager.closeAsync();
124-
}
125-
}
12659
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITAsyncExamplesTest.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,12 @@ public void runAsync() throws Exception {
253253
},
254254
executor);
255255
assertThat(insertCount.get()).isEqualTo(1L);
256+
if (env.getTestHelper().getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()) {
257+
// The runAsync() method should only be called once on the runner.
258+
// However, due to a bug in regular sessions, it can be executed multiple times on the same
259+
// runner.
260+
runner = client.runAsync();
261+
}
256262
ApiFuture<Long> deleteCount =
257263
runner.runAsync(
258264
txn ->
@@ -299,6 +305,12 @@ public void runAsyncBatchUpdate() throws Exception {
299305
},
300306
executor);
301307
assertThat(insertCount.get()).asList().containsExactly(1L, 1L, 1L);
308+
if (env.getTestHelper().getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW()) {
309+
// The runAsync() method should only be called once on the runner.
310+
// However, due to a bug in regular sessions, it can be executed multiple times on the same
311+
// runner.
312+
runner = client.runAsync();
313+
}
302314
ApiFuture<long[]> deleteCount =
303315
runner.runAsync(
304316
txn ->

google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBatchDmlTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.cloud.spanner.it;
1818

1919
import static com.google.common.truth.Truth.assertThat;
20+
import static org.junit.Assume.assumeFalse;
2021

2122
import com.google.api.gax.longrunning.OperationFuture;
2223
import com.google.cloud.spanner.Database;
@@ -84,6 +85,10 @@ public void dropTable() throws Exception {
8485

8586
@Test
8687
public void noStatementsInRequest() {
88+
// TODO(sriharshach): Remove this skip once backend support empty transactions to commit.
89+
assumeFalse(
90+
"Skipping for multiplexed sessions since it does not allow empty transactions to commit",
91+
isUsingMultiplexedSessionsForRW());
8792
final TransactionCallable<long[]> callable =
8893
transaction -> {
8994
List<Statement> stmts = new ArrayList<>();
@@ -252,4 +257,8 @@ public void largeBatchDml_withNonParameterisedStatements() {
252257
assertThat(actualRowCounts.length).isEqualTo(80);
253258
assertThat(expectedRowCounts).isEqualTo(actualRowCounts);
254259
}
260+
261+
boolean isUsingMultiplexedSessionsForRW() {
262+
return env.getTestHelper().getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW();
263+
}
255264
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITBatchReadTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,9 @@ public static void setUpDatabase() throws Exception {
181181
totalSize = 0;
182182
}
183183
}
184-
dbClient.write(mutations);
184+
if (!mutations.isEmpty()) {
185+
dbClient.write(mutations);
186+
}
185187
}
186188
// Our read/queries are executed with some staleness.
187189
Thread.sleep(2 * STALENESS_MILLISEC);

google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITTransactionTest.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,10 @@ public void nestedSingleUseReadTxnThrows() {
464464
@Test
465465
public void nestedTxnSucceedsWhenAllowed() {
466466
assumeFalse("Emulator does not support multiple parallel transactions", isUsingEmulator());
467-
467+
// TODO(sriharshach): Remove this skip once backend support empty transactions to commit.
468+
assumeFalse(
469+
"Skipping for multiplexed sessions since it does not allow empty transactions to commit",
470+
isUsingMultiplexedSessionsForRW());
468471
client
469472
.readWriteTransaction()
470473
.allowNestedTransaction()
@@ -588,4 +591,8 @@ public void testTransactionRunnerReturnsCommitStats() {
588591
// MutationCount = 2 (2 columns).
589592
assertEquals(2L, runner.getCommitResponse().getCommitStats().getMutationCount());
590593
}
594+
595+
boolean isUsingMultiplexedSessionsForRW() {
596+
return env.getTestHelper().getOptions().getSessionPoolOptions().getUseMultiplexedSessionForRW();
597+
}
591598
}

0 commit comments

Comments
 (0)