Skip to content

Commit dfd9c4b

Browse files
chore: add exclude_txn_from_change_streams option to Connection API (#3104)
* chore: add exclude_txn_from_change_streams option to Connection API * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: add support for PDML --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent ddebbbb commit dfd9c4b

16 files changed

+2435
-241
lines changed

google-cloud-spanner/clirr-ignored-differences.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,4 +676,16 @@
676676
<method>boolean isEnableExtendedTracing()</method>
677677
</difference>
678678

679+
<!-- Added ExcludeTxnFromChangeStreams -->
680+
<difference>
681+
<differenceType>7012</differenceType>
682+
<className>com/google/cloud/spanner/connection/Connection</className>
683+
<method>boolean isExcludeTxnFromChangeStreams()</method>
684+
</difference>
685+
<difference>
686+
<differenceType>7012</differenceType>
687+
<className>com/google/cloud/spanner/connection/Connection</className>
688+
<method>void setExcludeTxnFromChangeStreams(boolean)</method>
689+
</difference>
690+
679691
</differences>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/AbstractBaseUnitOfWork.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ abstract class AbstractBaseUnitOfWork implements UnitOfWork {
7373
private final StatementExecutor statementExecutor;
7474
private final StatementTimeout statementTimeout;
7575
protected final String transactionTag;
76+
protected final boolean excludeTxnFromChangeStreams;
7677
protected final RpcPriority rpcPriority;
7778
protected final Span span;
7879

@@ -107,6 +108,8 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractBaseUni
107108
private StatementExecutor statementExecutor;
108109
private StatementTimeout statementTimeout = new StatementTimeout();
109110
private String transactionTag;
111+
112+
private boolean excludeTxnFromChangeStreams;
110113
private RpcPriority rpcPriority;
111114
private Span span;
112115

@@ -134,6 +137,11 @@ B setTransactionTag(@Nullable String tag) {
134137
return self();
135138
}
136139

140+
B setExcludeTxnFromChangeStreams(boolean excludeTxnFromChangeStreams) {
141+
this.excludeTxnFromChangeStreams = excludeTxnFromChangeStreams;
142+
return self();
143+
}
144+
137145
B setRpcPriority(@Nullable RpcPriority rpcPriority) {
138146
this.rpcPriority = rpcPriority;
139147
return self();
@@ -152,6 +160,7 @@ B setSpan(@Nullable Span span) {
152160
this.statementExecutor = builder.statementExecutor;
153161
this.statementTimeout = builder.statementTimeout;
154162
this.transactionTag = builder.transactionTag;
163+
this.excludeTxnFromChangeStreams = builder.excludeTxnFromChangeStreams;
155164
this.rpcPriority = builder.rpcPriority;
156165
this.span = Preconditions.checkNotNull(builder.span);
157166
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/Connection.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,22 @@ default String getStatementTag() {
387387
throw new UnsupportedOperationException();
388388
}
389389

390+
/**
391+
* Sets whether the next transaction should be excluded from all change streams with the DDL
392+
* option `allow_txn_exclusion=true`
393+
*/
394+
default void setExcludeTxnFromChangeStreams(boolean excludeTxnFromChangeStreams) {
395+
throw new UnsupportedOperationException();
396+
}
397+
398+
/**
399+
* Returns true if the next transaction should be excluded from all change streams with the DDL
400+
* option `allow_txn_exclusion=true`
401+
*/
402+
default boolean isExcludeTxnFromChangeStreams() {
403+
throw new UnsupportedOperationException();
404+
}
405+
390406
/**
391407
* @return <code>true</code> if this connection will automatically retry read/write transactions
392408
* that abort. This method may only be called when the connection is in read/write

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionImpl.java

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,8 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
269269
private String transactionTag;
270270
private String statementTag;
271271

272+
private boolean excludeTxnFromChangeStreams;
273+
272274
private Duration maxCommitDelay;
273275

274276
/** Create a connection and register it in the SpannerPool. */
@@ -743,6 +745,24 @@ public void setStatementTag(String tag) {
743745
this.statementTag = tag;
744746
}
745747

748+
@Override
749+
public boolean isExcludeTxnFromChangeStreams() {
750+
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
751+
ConnectionPreconditions.checkState(!isDdlBatchActive(), "This connection is in a DDL batch");
752+
return excludeTxnFromChangeStreams;
753+
}
754+
755+
@Override
756+
public void setExcludeTxnFromChangeStreams(boolean excludeTxnFromChangeStreams) {
757+
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
758+
ConnectionPreconditions.checkState(
759+
!isBatchActive(), "Cannot set exclude_txn_from_change_streams while in a batch");
760+
ConnectionPreconditions.checkState(
761+
!isTransactionStarted(),
762+
"exclude_txn_from_change_streams cannot be set after the transaction has started");
763+
this.excludeTxnFromChangeStreams = excludeTxnFromChangeStreams;
764+
}
765+
746766
/**
747767
* Throws an {@link SpannerException} with code {@link ErrorCode#FAILED_PRECONDITION} if the
748768
* current state of this connection does not allow changing the setting for retryAbortsInternally.
@@ -899,6 +919,7 @@ private void setDefaultTransactionOptions() {
899919
: UnitOfWorkType.READ_WRITE_TRANSACTION;
900920
batchMode = BatchMode.NONE;
901921
transactionTag = null;
922+
excludeTxnFromChangeStreams = false;
902923
} else {
903924
popUnitOfWorkFromTransactionStack();
904925
}
@@ -1768,22 +1789,29 @@ UnitOfWork createNewUnitOfWork(
17681789
if (isInternalMetadataQuery
17691790
|| (isAutocommit() && !isInTransaction() && !isInBatch())
17701791
|| forceSingleUse) {
1771-
return SingleUseTransaction.newBuilder()
1772-
.setInternalMetadataQuery(isInternalMetadataQuery)
1773-
.setDdlClient(ddlClient)
1774-
.setDatabaseClient(dbClient)
1775-
.setBatchClient(batchClient)
1776-
.setReadOnly(isReadOnly())
1777-
.setReadOnlyStaleness(readOnlyStaleness)
1778-
.setAutocommitDmlMode(autocommitDmlMode)
1779-
.setReturnCommitStats(returnCommitStats)
1780-
.setMaxCommitDelay(maxCommitDelay)
1781-
.setStatementTimeout(statementTimeout)
1782-
.withStatementExecutor(statementExecutor)
1783-
.setSpan(
1784-
createSpanForUnitOfWork(
1785-
statementType == StatementType.DDL ? DDL_STATEMENT : SINGLE_USE_TRANSACTION))
1786-
.build();
1792+
SingleUseTransaction singleUseTransaction =
1793+
SingleUseTransaction.newBuilder()
1794+
.setInternalMetadataQuery(isInternalMetadataQuery)
1795+
.setDdlClient(ddlClient)
1796+
.setDatabaseClient(dbClient)
1797+
.setBatchClient(batchClient)
1798+
.setReadOnly(isReadOnly())
1799+
.setReadOnlyStaleness(readOnlyStaleness)
1800+
.setAutocommitDmlMode(autocommitDmlMode)
1801+
.setReturnCommitStats(returnCommitStats)
1802+
.setExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams)
1803+
.setMaxCommitDelay(maxCommitDelay)
1804+
.setStatementTimeout(statementTimeout)
1805+
.withStatementExecutor(statementExecutor)
1806+
.setSpan(
1807+
createSpanForUnitOfWork(
1808+
statementType == StatementType.DDL ? DDL_STATEMENT : SINGLE_USE_TRANSACTION))
1809+
.build();
1810+
if (!isInternalMetadataQuery && !forceSingleUse) {
1811+
// Reset the transaction options after starting a single-use transaction.
1812+
setDefaultTransactionOptions();
1813+
}
1814+
return singleUseTransaction;
17871815
} else {
17881816
switch (getUnitOfWorkType()) {
17891817
case READ_ONLY_TRANSACTION:
@@ -1810,6 +1838,7 @@ UnitOfWork createNewUnitOfWork(
18101838
.setStatementTimeout(statementTimeout)
18111839
.withStatementExecutor(statementExecutor)
18121840
.setTransactionTag(transactionTag)
1841+
.setExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams)
18131842
.setRpcPriority(rpcPriority)
18141843
.setSpan(createSpanForUnitOfWork(READ_WRITE_TRANSACTION))
18151844
.build();
@@ -1822,6 +1851,7 @@ UnitOfWork createNewUnitOfWork(
18221851
.setStatementTimeout(statementTimeout)
18231852
.withStatementExecutor(statementExecutor)
18241853
.setStatementTag(statementTag)
1854+
.setExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams)
18251855
.setRpcPriority(rpcPriority)
18261856
.setSpan(createSpanForUnitOfWork(DML_BATCH))
18271857
.build();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionStatementExecutor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,10 @@ StatementResult statementSetDelayTransactionStartUntilFirstWrite(
9999

100100
StatementResult statementShowTransactionTag();
101101

102+
StatementResult statementSetExcludeTxnFromChangeStreams(Boolean excludeTxnFromChangeStreams);
103+
104+
StatementResult statementShowExcludeTxnFromChangeStreams();
105+
102106
StatementResult statementBeginTransaction();
103107

104108
StatementResult statementBeginPgTransaction(PgTransactionMode transactionMode);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ConnectionStatementExecutorImpl.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_DEFAULT_TRANSACTION_ISOLATION;
3030
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE;
3131
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_DIRECTED_READ;
32+
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_EXCLUDE_TXN_FROM_CHANGE_STREAMS;
3233
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_MAX_COMMIT_DELAY;
3334
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_MAX_PARTITIONED_PARALLELISM;
3435
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_MAX_PARTITIONS;
@@ -52,6 +53,7 @@
5253
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_DATA_BOOST_ENABLED;
5354
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE;
5455
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_DIRECTED_READ;
56+
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_EXCLUDE_TXN_FROM_CHANGE_STREAMS;
5557
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_MAX_COMMIT_DELAY;
5658
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_MAX_PARTITIONED_PARALLELISM;
5759
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_MAX_PARTITIONS;
@@ -409,6 +411,21 @@ public StatementResult statementShowTransactionTag() {
409411
SHOW_TRANSACTION_TAG);
410412
}
411413

414+
@Override
415+
public StatementResult statementSetExcludeTxnFromChangeStreams(
416+
Boolean excludeTxnFromChangeStreams) {
417+
getConnection().setExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams);
418+
return noResult(SET_EXCLUDE_TXN_FROM_CHANGE_STREAMS);
419+
}
420+
421+
@Override
422+
public StatementResult statementShowExcludeTxnFromChangeStreams() {
423+
return resultSet(
424+
String.format("%sEXCLUDE_TXN_FROM_CHANGE_STREAMS", getNamespace(connection.getDialect())),
425+
getConnection().isExcludeTxnFromChangeStreams(),
426+
SHOW_EXCLUDE_TXN_FROM_CHANGE_STREAMS);
427+
}
428+
412429
@Override
413430
public StatementResult statementBeginTransaction() {
414431
getConnection().beginTransaction();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,9 @@ private TransactionOption[] extractOptions(Builder builder) {
251251
if (this.transactionTag != null) {
252252
numOptions++;
253253
}
254+
if (this.excludeTxnFromChangeStreams) {
255+
numOptions++;
256+
}
254257
if (this.rpcPriority != null) {
255258
numOptions++;
256259
}
@@ -265,6 +268,9 @@ private TransactionOption[] extractOptions(Builder builder) {
265268
if (this.transactionTag != null) {
266269
options[index++] = Options.tag(this.transactionTag);
267270
}
271+
if (this.excludeTxnFromChangeStreams) {
272+
options[index++] = Options.excludeTxnFromChangeStreams();
273+
}
268274
if (this.rpcPriority != null) {
269275
options[index++] = Options.priority(this.rpcPriority);
270276
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/SingleUseTransaction.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import com.google.spanner.v1.SpannerGrpc;
5656
import io.opentelemetry.context.Scope;
5757
import java.time.Duration;
58+
import java.util.Arrays;
5859
import java.util.concurrent.Callable;
5960

6061
/**
@@ -497,6 +498,9 @@ private TransactionRunner createWriteTransaction() {
497498
if (returnCommitStats) {
498499
numOptions++;
499500
}
501+
if (excludeTxnFromChangeStreams) {
502+
numOptions++;
503+
}
500504
if (maxCommitDelay != null) {
501505
numOptions++;
502506
}
@@ -511,6 +515,9 @@ private TransactionRunner createWriteTransaction() {
511515
if (returnCommitStats) {
512516
options[index++] = Options.commitStats();
513517
}
518+
if (excludeTxnFromChangeStreams) {
519+
options[index++] = Options.excludeTxnFromChangeStreams();
520+
}
514521
if (maxCommitDelay != null) {
515522
options[index++] = Options.maxCommitDelay(maxCommitDelay);
516523
}
@@ -580,10 +587,21 @@ private ApiFuture<ResultSet> analyzeTransactionalUpdateAsync(
580587

581588
private ApiFuture<Long> executePartitionedUpdateAsync(
582589
CallType callType, final ParsedStatement update, final UpdateOption... options) {
590+
final UpdateOption[] effectiveOptions;
591+
if (excludeTxnFromChangeStreams) {
592+
if (options.length == 0) {
593+
effectiveOptions = new UpdateOption[] {Options.excludeTxnFromChangeStreams()};
594+
} else {
595+
effectiveOptions = Arrays.copyOf(options, options.length + 1);
596+
effectiveOptions[effectiveOptions.length - 1] = Options.excludeTxnFromChangeStreams();
597+
}
598+
} else {
599+
effectiveOptions = options;
600+
}
583601
Callable<Long> callable =
584602
() -> {
585603
try {
586-
Long res = dbClient.executePartitionedUpdate(update.getStatement(), options);
604+
Long res = dbClient.executePartitionedUpdate(update.getStatement(), effectiveOptions);
587605
state = UnitOfWorkState.COMMITTED;
588606
return res;
589607
} catch (Throwable t) {

google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/StatementResult.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,8 @@ enum ClientSideStatementType {
7979
SET_STATEMENT_TAG,
8080
SHOW_TRANSACTION_TAG,
8181
SET_TRANSACTION_TAG,
82+
SHOW_EXCLUDE_TXN_FROM_CHANGE_STREAMS,
83+
SET_EXCLUDE_TXN_FROM_CHANGE_STREAMS,
8284
BEGIN,
8385
COMMIT,
8486
ROLLBACK,

google-cloud-spanner/src/main/resources/com/google/cloud/spanner/connection/ClientSideStatements.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,15 @@
149149
"method": "statementShowTransactionTag",
150150
"exampleStatements": ["show variable transaction_tag"]
151151
},
152+
{
153+
"name": "SHOW VARIABLE EXCLUDE_TXN_FROM_CHANGE_STREAMS",
154+
"executorName": "ClientSideStatementNoParamExecutor",
155+
"resultType": "RESULT_SET",
156+
"statementType": "SHOW_EXCLUDE_TXN_FROM_CHANGE_STREAMS",
157+
"regex": "(?is)\\A\\s*show\\s+variable\\s+exclude_txn_from_change_streams\\s*\\z",
158+
"method": "statementShowExcludeTxnFromChangeStreams",
159+
"exampleStatements": ["show variable exclude_txn_from_change_streams"]
160+
},
152161
{
153162
"name": "SHOW VARIABLE RPC_PRIORITY",
154163
"executorName": "ClientSideStatementNoParamExecutor",
@@ -497,6 +506,21 @@
497506
"converterName": "ClientSideStatementValueConverters$StringValueConverter"
498507
}
499508
},
509+
{
510+
"name": "SET EXCLUDE_TXN_FROM_CHANGE_STREAMS = TRUE|FALSE",
511+
"executorName": "ClientSideStatementSetExecutor",
512+
"resultType": "NO_RESULT",
513+
"statementType": "SET_EXCLUDE_TXN_FROM_CHANGE_STREAMS",
514+
"regex": "(?is)\\A\\s*set\\s+exclude_txn_from_change_streams\\s*(?:=)\\s*(.*)\\z",
515+
"method": "statementSetExcludeTxnFromChangeStreams",
516+
"exampleStatements": ["set exclude_txn_from_change_streams = true", "set exclude_txn_from_change_streams = false"],
517+
"setStatement": {
518+
"propertyName": "EXCLUDE_TXN_FROM_CHANGE_STREAMS",
519+
"separator": "=",
520+
"allowedValues": "(TRUE|FALSE)",
521+
"converterName": "ClientSideStatementValueConverters$BooleanConverter"
522+
}
523+
},
500524
{
501525
"name": "SET RPC_PRIORITY = 'HIGH'|'MEDIUM'|'LOW'|'NULL'",
502526
"executorName": "ClientSideStatementSetExecutor",

google-cloud-spanner/src/main/resources/com/google/cloud/spanner/connection/PG_ClientSideStatements.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,15 @@
149149
"method": "statementShowTransactionTag",
150150
"exampleStatements": ["show spanner.transaction_tag","show variable spanner.transaction_tag"]
151151
},
152+
{
153+
"name": "SHOW [VARIABLE] SPANNER.EXCLUDE_TXN_FROM_CHANGE_STREAMS",
154+
"executorName": "ClientSideStatementNoParamExecutor",
155+
"resultType": "RESULT_SET",
156+
"statementType": "SHOW_EXCLUDE_TXN_FROM_CHANGE_STREAMS",
157+
"regex": "(?is)\\A\\s*show\\s+(?:variable\\s+)?spanner\\.exclude_txn_from_change_streams\\s*\\z",
158+
"method": "statementShowExcludeTxnFromChangeStreams",
159+
"exampleStatements": ["show spanner.exclude_txn_from_change_streams","show variable spanner.exclude_txn_from_change_streams"]
160+
},
152161
{
153162
"name": "SHOW [VARIABLE] SPANNER.RPC_PRIORITY",
154163
"executorName": "ClientSideStatementNoParamExecutor",
@@ -679,6 +688,21 @@
679688
"converterName": "ClientSideStatementValueConverters$StringValueConverter"
680689
}
681690
},
691+
{
692+
"name": "SET SPANNER.EXCLUDE_TXN_FROM_CHANGE_STREAMS =|TO TRUE|FALSE",
693+
"executorName": "ClientSideStatementSetExecutor",
694+
"resultType": "NO_RESULT",
695+
"statementType": "SET_EXCLUDE_TXN_FROM_CHANGE_STREAMS",
696+
"regex": "(?is)\\A\\s*set\\s+spanner\\.exclude_txn_from_change_streams(?:\\s*=\\s*|\\s+to\\s+)(.*)\\z",
697+
"method": "statementSetReturnCommitStats",
698+
"exampleStatements": ["set spanner.exclude_txn_from_change_streams = true", "set spanner.exclude_txn_from_change_streams = false", "set spanner.exclude_txn_from_change_streams to true", "set spanner.exclude_txn_from_change_streams to false"],
699+
"setStatement": {
700+
"propertyName": "SPANNER.EXCLUDE_TXN_FROM_CHANGE_STREAMS",
701+
"separator": "(?:=|\\s+TO\\s+)",
702+
"allowedValues": "(TRUE|FALSE)",
703+
"converterName": "ClientSideStatementValueConverters$BooleanConverter"
704+
}
705+
},
682706
{
683707
"name": "SET SPANNER.RPC_PRIORITY =|TO 'HIGH'|'MEDIUM'|'LOW'|'NULL'",
684708
"executorName": "ClientSideStatementSetExecutor",

0 commit comments

Comments
 (0)