Skip to content

Commit 9a5d86b

Browse files
authored
chore(spanner): add multiplexed session support for batch write (#3470)
* chore(spanner): add multiplexed session support for batch write * chore(spanner): lint fix
1 parent 1f143a4 commit 9a5d86b

File tree

6 files changed

+73
-10
lines changed

6 files changed

+73
-10
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractMultiplexedSessionDatabaseClient.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616

1717
package com.google.cloud.spanner;
1818

19-
import com.google.api.gax.rpc.ServerStream;
2019
import com.google.cloud.Timestamp;
21-
import com.google.cloud.spanner.Options.TransactionOption;
22-
import com.google.spanner.v1.BatchWriteResponse;
2320

2421
/**
2522
* Base class for the Multiplexed Session {@link DatabaseClient} implementation. Throws {@link
@@ -43,11 +40,4 @@ public String getDatabaseRole() {
4340
public Timestamp writeAtLeastOnce(Iterable<Mutation> mutations) throws SpannerException {
4441
return writeAtLeastOnceWithOptions(mutations).getCommitTimestamp();
4542
}
46-
47-
@Override
48-
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
49-
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
50-
throws SpannerException {
51-
throw new UnsupportedOperationException();
52-
}
5343
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DatabaseClientImpl.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,9 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
189189
throws SpannerException {
190190
ISpan span = tracer.spanBuilder(READ_WRITE_TRANSACTION, commonAttributes, options);
191191
try (IScope s = tracer.withSpan(span)) {
192+
if (canUseMultiplexedSessionsForRW() && getMultiplexedSessionDatabaseClient() != null) {
193+
return getMultiplexedSessionDatabaseClient().batchWriteAtLeastOnce(mutationGroups, options);
194+
}
192195
return runWithSessionRetry(session -> session.batchWriteAtLeastOnce(mutationGroups, options));
193196
} catch (RuntimeException e) {
194197
span.setStatus(e);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/DelayedMultiplexedSessionTransaction.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020

2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.core.ApiFutures;
23+
import com.google.api.gax.rpc.ServerStream;
2324
import com.google.cloud.Timestamp;
2425
import com.google.cloud.spanner.DelayedReadContext.DelayedReadOnlyTransaction;
2526
import com.google.cloud.spanner.MultiplexedSessionDatabaseClient.MultiplexedSessionTransaction;
2627
import com.google.cloud.spanner.Options.TransactionOption;
2728
import com.google.cloud.spanner.Options.UpdateOption;
2829
import com.google.common.util.concurrent.MoreExecutors;
30+
import com.google.spanner.v1.BatchWriteResponse;
2931
import java.util.concurrent.ExecutionException;
3032

3133
/**
@@ -164,6 +166,22 @@ public CommitResponse writeWithOptions(Iterable<Mutation> mutations, Transaction
164166
}
165167
}
166168

169+
/**
170+
* This is a blocking method, as the interface that it implements is also defined as a blocking
171+
* method.
172+
*/
173+
@Override
174+
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
175+
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
176+
throws SpannerException {
177+
SessionReference sessionReference = getSessionReference();
178+
try (MultiplexedSessionTransaction transaction =
179+
new MultiplexedSessionTransaction(
180+
client, span, sessionReference, NO_CHANNEL_HINT, /* singleUse = */ true)) {
181+
return transaction.batchWriteAtLeastOnce(mutationGroups, options);
182+
}
183+
}
184+
167185
@Override
168186
public TransactionRunner readWriteTransaction(TransactionOption... options) {
169187
return new DelayedTransactionRunner(

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.api.core.ApiFuture;
2323
import com.google.api.core.ApiFutures;
2424
import com.google.api.core.SettableApiFuture;
25+
import com.google.api.gax.rpc.ServerStream;
2526
import com.google.cloud.Timestamp;
2627
import com.google.cloud.spanner.Options.TransactionOption;
2728
import com.google.cloud.spanner.Options.UpdateOption;
@@ -30,6 +31,7 @@
3031
import com.google.common.annotations.VisibleForTesting;
3132
import com.google.common.base.Preconditions;
3233
import com.google.common.util.concurrent.MoreExecutors;
34+
import com.google.spanner.v1.BatchWriteResponse;
3335
import com.google.spanner.v1.BeginTransactionRequest;
3436
import com.google.spanner.v1.RequestOptions;
3537
import com.google.spanner.v1.Transaction;
@@ -505,6 +507,14 @@ public CommitResponse writeAtLeastOnceWithOptions(
505507
.writeAtLeastOnceWithOptions(mutations, options);
506508
}
507509

510+
@Override
511+
public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
512+
Iterable<MutationGroup> mutationGroups, TransactionOption... options)
513+
throws SpannerException {
514+
return createMultiplexedSessionTransaction(/* singleUse = */ true)
515+
.batchWriteAtLeastOnce(mutationGroups, options);
516+
}
517+
508518
@Override
509519
public ReadContext singleUse() {
510520
return createMultiplexedSessionTransaction(/* singleUse = */ true).singleUse();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
321321
throw SpannerExceptionFactory.newSpannerException(e);
322322
} finally {
323323
span.end();
324+
onTransactionDone();
324325
}
325326
}
326327

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import com.google.api.core.ApiFuture;
3232
import com.google.api.core.ApiFutures;
33+
import com.google.api.gax.rpc.ServerStream;
3334
import com.google.cloud.NoCredentials;
3435
import com.google.cloud.Timestamp;
3536
import com.google.cloud.spanner.AsyncTransactionManager.AsyncTransactionStep;
@@ -45,6 +46,8 @@
4546
import com.google.common.collect.Lists;
4647
import com.google.common.util.concurrent.MoreExecutors;
4748
import com.google.protobuf.ByteString;
49+
import com.google.spanner.v1.BatchWriteRequest;
50+
import com.google.spanner.v1.BatchWriteResponse;
4851
import com.google.spanner.v1.BeginTransactionRequest;
4952
import com.google.spanner.v1.CommitRequest;
5053
import com.google.spanner.v1.ExecuteSqlRequest;
@@ -1635,6 +1638,44 @@ public void testReadWriteTransactionWithCommitRetryProtocolExtensionSet() {
16351638
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
16361639
}
16371640

1641+
@Test
1642+
public void testBatchWriteAtLeastOnce() {
1643+
DatabaseClientImpl client =
1644+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
1645+
1646+
Iterable<MutationGroup> MUTATION_GROUPS =
1647+
ImmutableList.of(
1648+
MutationGroup.of(
1649+
Mutation.newInsertBuilder("FOO1").set("ID").to(1L).set("NAME").to("Bar1").build(),
1650+
Mutation.newInsertBuilder("FOO2").set("ID").to(2L).set("NAME").to("Bar2").build()),
1651+
MutationGroup.of(
1652+
Mutation.newInsertBuilder("FOO3").set("ID").to(3L).set("NAME").to("Bar3").build(),
1653+
Mutation.newInsertBuilder("FOO4").set("ID").to(4L).set("NAME").to("Bar4").build()));
1654+
1655+
ServerStream<BatchWriteResponse> responseStream = client.batchWriteAtLeastOnce(MUTATION_GROUPS);
1656+
int idx = 0;
1657+
for (BatchWriteResponse response : responseStream) {
1658+
assertEquals(
1659+
response.getStatus(),
1660+
com.google.rpc.Status.newBuilder().setCode(com.google.rpc.Code.OK_VALUE).build());
1661+
assertEquals(response.getIndexesList(), ImmutableList.of(idx, idx + 1));
1662+
idx += 2;
1663+
}
1664+
1665+
assertNotNull(responseStream);
1666+
List<BatchWriteRequest> requests = mockSpanner.getRequestsOfType(BatchWriteRequest.class);
1667+
assertEquals(requests.size(), 1);
1668+
BatchWriteRequest request = requests.get(0);
1669+
assertTrue(mockSpanner.getSession(request.getSession()).getMultiplexed());
1670+
assertEquals(request.getMutationGroupsCount(), 2);
1671+
assertEquals(request.getRequestOptions().getPriority(), Priority.PRIORITY_UNSPECIFIED);
1672+
assertFalse(request.getExcludeTxnFromChangeStreams());
1673+
1674+
assertNotNull(client.multiplexedSessionDatabaseClient);
1675+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
1676+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
1677+
}
1678+
16381679
private void waitForSessionToBeReplaced(DatabaseClientImpl client) {
16391680
assertNotNull(client.multiplexedSessionDatabaseClient);
16401681
SessionReference sessionReference =

0 commit comments

Comments
 (0)