Skip to content

Commit 46e0d3e

Browse files
committed
Add support for transaction-level exclusion from change streams
1 parent 2392afe commit 46e0d3e

File tree

6 files changed

+370
-9
lines changed

6 files changed

+370
-9
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ public interface ReadOption {}
6161
public interface ReadQueryUpdateTransactionOption
6262
extends ReadOption, QueryOption, UpdateOption, TransactionOption {}
6363

64+
/** Marker interface to mark options applicable to Update and Write operations */
65+
public interface UpdateTransactionOption extends UpdateOption, TransactionOption {}
66+
6467
/**
6568
* Marker interface to mark options applicable to Create, Update and Delete operations in admin
6669
* API.
@@ -108,6 +111,17 @@ public static TransactionOption commitStats() {
108111
public static TransactionOption optimisticLock() {
109112
return OPTIMISTIC_LOCK_OPTION;
110113
}
114+
115+
/**
116+
* Specifying this instructs the transaction to be excluded from being recorded in change streams
117+
* with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from
118+
* being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or
119+
* unset.
120+
*/
121+
public static UpdateTransactionOption excludeTxnFromChangeStreams() {
122+
return EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION;
123+
}
124+
111125
/**
112126
* Specifying this will cause the read to yield at most this many rows. This should be greater
113127
* than 0.
@@ -282,6 +296,18 @@ void appendToOptions(Options options) {
282296

283297
static final OptimisticLockOption OPTIMISTIC_LOCK_OPTION = new OptimisticLockOption();
284298

299+
/** Option to request the transaction to be excluded from change streams. */
300+
static final class ExcludeTxnFromChangeStreamsOption extends InternalOption
301+
implements UpdateTransactionOption {
302+
@Override
303+
void appendToOptions(Options options) {
304+
options.withExcludeTxnFromChangeStreams = true;
305+
}
306+
}
307+
308+
static final ExcludeTxnFromChangeStreamsOption EXCLUDE_TXN_FROM_CHANGE_STREAMS_OPTION =
309+
new ExcludeTxnFromChangeStreamsOption();
310+
285311
/** Option pertaining to flow control. */
286312
static final class FlowControlOption extends InternalOption implements ReadAndQueryOption {
287313
final int prefetchChunks;
@@ -406,6 +432,7 @@ void appendToOptions(Options options) {
406432
private String etag;
407433
private Boolean validateOnly;
408434
private Boolean withOptimisticLock;
435+
private Boolean withExcludeTxnFromChangeStreams;
409436
private Boolean dataBoostEnabled;
410437
private DirectedReadOptions directedReadOptions;
411438
private DecodeMode decodeMode;
@@ -509,6 +536,10 @@ Boolean withOptimisticLock() {
509536
return withOptimisticLock;
510537
}
511538

539+
Boolean withExcludeTxnFromChangeStreams() {
540+
return withExcludeTxnFromChangeStreams;
541+
}
542+
512543
boolean hasDataBoostEnabled() {
513544
return dataBoostEnabled != null;
514545
}
@@ -572,6 +603,11 @@ public String toString() {
572603
if (withOptimisticLock != null) {
573604
b.append("withOptimisticLock: ").append(withOptimisticLock).append(' ');
574605
}
606+
if (withExcludeTxnFromChangeStreams != null) {
607+
b.append("withExcludeTxnFromChangeStreams: ")
608+
.append(withExcludeTxnFromChangeStreams)
609+
.append(' ');
610+
}
575611
if (dataBoostEnabled != null) {
576612
b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' ');
577613
}
@@ -617,6 +653,7 @@ public boolean equals(Object o) {
617653
&& Objects.equals(etag(), that.etag())
618654
&& Objects.equals(validateOnly(), that.validateOnly())
619655
&& Objects.equals(withOptimisticLock(), that.withOptimisticLock())
656+
&& Objects.equals(withExcludeTxnFromChangeStreams(), that.withExcludeTxnFromChangeStreams())
620657
&& Objects.equals(dataBoostEnabled(), that.dataBoostEnabled())
621658
&& Objects.equals(directedReadOptions(), that.directedReadOptions());
622659
}
@@ -663,6 +700,9 @@ public int hashCode() {
663700
if (withOptimisticLock != null) {
664701
result = 31 * result + withOptimisticLock.hashCode();
665702
}
703+
if (withExcludeTxnFromChangeStreams != null) {
704+
result = 31 * result + withExcludeTxnFromChangeStreams.hashCode();
705+
}
666706
if (dataBoostEnabled != null) {
667707
result = 31 * result + dataBoostEnabled.hashCode();
668708
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/PartitionedDmlTransaction.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ private ExecuteSqlRequest resumeOrRestartRequest(
167167

168168
@VisibleForTesting
169169
ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Options options) {
170-
ByteString transactionId = initTransaction();
170+
ByteString transactionId = initTransaction(options);
171171

172172
final TransactionSelector transactionSelector =
173173
TransactionSelector.newBuilder().setId(transactionId).build();
@@ -195,13 +195,15 @@ ExecuteSqlRequest newTransactionRequestFrom(final Statement statement, final Opt
195195
return builder.build();
196196
}
197197

198-
private ByteString initTransaction() {
198+
private ByteString initTransaction(final Options options) {
199199
final BeginTransactionRequest request =
200200
BeginTransactionRequest.newBuilder()
201201
.setSession(session.getName())
202202
.setOptions(
203203
TransactionOptions.newBuilder()
204-
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance()))
204+
.setPartitionedDml(TransactionOptions.PartitionedDml.getDefaultInstance())
205+
.setExcludeTxnFromChangeStreams(
206+
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE))
205207
.build();
206208
Transaction tx = rpc.beginTransaction(request, session.getOptions(), true);
207209
if (tx.getId().isEmpty()) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,11 +69,16 @@ static void throwIfTransactionsPending() {
6969
}
7070

7171
static TransactionOptions createReadWriteTransactionOptions(Options options) {
72+
TransactionOptions.Builder transactionOptions = TransactionOptions.newBuilder();
73+
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
74+
transactionOptions.setExcludeTxnFromChangeStreams(true);
75+
}
7276
TransactionOptions.ReadWrite.Builder readWrite = TransactionOptions.ReadWrite.newBuilder();
7377
if (options.withOptimisticLock() == Boolean.TRUE) {
7478
readWrite.setReadLockMode(TransactionOptions.ReadWrite.ReadLockMode.OPTIMISTIC);
7579
}
76-
return TransactionOptions.newBuilder().setReadWrite(readWrite).build();
80+
transactionOptions.setReadWrite(readWrite);
81+
return transactionOptions.build();
7782
}
7883

7984
/**
@@ -181,10 +186,16 @@ public CommitResponse writeAtLeastOnceWithOptions(
181186
CommitRequest.newBuilder()
182187
.setSession(name)
183188
.setReturnCommitStats(options.withCommitStats())
184-
.addAllMutations(mutationsProto)
185-
.setSingleUseTransaction(
186-
TransactionOptions.newBuilder()
187-
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
189+
.addAllMutations(mutationsProto);
190+
191+
TransactionOptions.Builder transactionOptionsBuilder =
192+
TransactionOptions.newBuilder()
193+
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance());
194+
if (options.withExcludeTxnFromChangeStreams() == Boolean.TRUE) {
195+
transactionOptionsBuilder.setExcludeTxnFromChangeStreams(true);
196+
}
197+
requestBuilder.setSingleUseTransaction(transactionOptionsBuilder);
198+
188199
if (options.hasMaxCommitDelay()) {
189200
requestBuilder.setMaxCommitDelay(
190201
Duration.newBuilder()
@@ -238,6 +249,10 @@ public ServerStream<BatchWriteResponse> batchWriteAtLeastOnce(
238249
if (batchWriteRequestOptions != null) {
239250
requestBuilder.setRequestOptions(batchWriteRequestOptions);
240251
}
252+
if (Options.fromTransactionOptions(transactionOptions).withExcludeTxnFromChangeStreams()
253+
== Boolean.TRUE) {
254+
requestBuilder.setExcludeTxnFromChangeStreams(true);
255+
}
241256
ISpan span = tracer.spanBuilder(SpannerImpl.BATCH_WRITE);
242257
try (IScope s = tracer.withSpan(span)) {
243258
return spanner.getRpc().batchWriteAtLeastOnce(requestBuilder.build(), this.options);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,9 @@ public void run() {
371371
if (transactionId == null && transactionIdFuture == null) {
372372
requestBuilder.setSingleUseTransaction(
373373
TransactionOptions.newBuilder()
374-
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()));
374+
.setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())
375+
.setExcludeTxnFromChangeStreams(
376+
options.withExcludeTxnFromChangeStreams() == Boolean.TRUE));
375377
} else {
376378
requestBuilder.setTransactionId(
377379
transactionId == null

0 commit comments

Comments
 (0)