Skip to content

chore: add exclude_txn_from_change_streams option to Connection API #3104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions google-cloud-spanner/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -676,4 +676,16 @@
<method>boolean isEnableExtendedTracing()</method>
</difference>

<!-- Added ExcludeTxnFromChangeStreams -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>boolean isExcludeTxnFromChangeStreams()</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/spanner/connection/Connection</className>
<method>void setExcludeTxnFromChangeStreams(boolean)</method>
</difference>

</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ abstract class AbstractBaseUnitOfWork implements UnitOfWork {
private final StatementExecutor statementExecutor;
private final StatementTimeout statementTimeout;
protected final String transactionTag;
protected final boolean excludeTxnFromChangeStreams;
protected final RpcPriority rpcPriority;
protected final Span span;

Expand Down Expand Up @@ -107,6 +108,8 @@ abstract static class Builder<B extends Builder<?, T>, T extends AbstractBaseUni
private StatementExecutor statementExecutor;
private StatementTimeout statementTimeout = new StatementTimeout();
private String transactionTag;

private boolean excludeTxnFromChangeStreams;
private RpcPriority rpcPriority;
private Span span;

Expand Down Expand Up @@ -134,6 +137,11 @@ B setTransactionTag(@Nullable String tag) {
return self();
}

B setExcludeTxnFromChangeStreams(boolean excludeTxnFromChangeStreams) {
this.excludeTxnFromChangeStreams = excludeTxnFromChangeStreams;
return self();
}

B setRpcPriority(@Nullable RpcPriority rpcPriority) {
this.rpcPriority = rpcPriority;
return self();
Expand All @@ -152,6 +160,7 @@ B setSpan(@Nullable Span span) {
this.statementExecutor = builder.statementExecutor;
this.statementTimeout = builder.statementTimeout;
this.transactionTag = builder.transactionTag;
this.excludeTxnFromChangeStreams = builder.excludeTxnFromChangeStreams;
this.rpcPriority = builder.rpcPriority;
this.span = Preconditions.checkNotNull(builder.span);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,22 @@ default String getStatementTag() {
throw new UnsupportedOperationException();
}

/**
* Sets whether the next transaction should be excluded from all change streams with the DDL
* option `allow_txn_exclusion=true`
*/
default void setExcludeTxnFromChangeStreams(boolean excludeTxnFromChangeStreams) {
throw new UnsupportedOperationException();
}

/**
* Returns true if the next transaction should be excluded from all change streams with the DDL
* option `allow_txn_exclusion=true`
*/
default boolean isExcludeTxnFromChangeStreams() {
throw new UnsupportedOperationException();
}

/**
* @return <code>true</code> if this connection will automatically retry read/write transactions
* that abort. This method may only be called when the connection is in read/write
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
private String transactionTag;
private String statementTag;

private boolean excludeTxnFromChangeStreams;

private Duration maxCommitDelay;

/** Create a connection and register it in the SpannerPool. */
Expand Down Expand Up @@ -743,6 +745,24 @@ public void setStatementTag(String tag) {
this.statementTag = tag;
}

@Override
public boolean isExcludeTxnFromChangeStreams() {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(!isDdlBatchActive(), "This connection is in a DDL batch");
return excludeTxnFromChangeStreams;
}

@Override
public void setExcludeTxnFromChangeStreams(boolean excludeTxnFromChangeStreams) {
ConnectionPreconditions.checkState(!isClosed(), CLOSED_ERROR_MSG);
ConnectionPreconditions.checkState(
!isBatchActive(), "Cannot set exclude_txn_from_change_streams while in a batch");
ConnectionPreconditions.checkState(
!isTransactionStarted(),
"exclude_txn_from_change_streams cannot be set after the transaction has started");
this.excludeTxnFromChangeStreams = excludeTxnFromChangeStreams;
}

/**
* Throws an {@link SpannerException} with code {@link ErrorCode#FAILED_PRECONDITION} if the
* current state of this connection does not allow changing the setting for retryAbortsInternally.
Expand Down Expand Up @@ -899,6 +919,7 @@ private void setDefaultTransactionOptions() {
: UnitOfWorkType.READ_WRITE_TRANSACTION;
batchMode = BatchMode.NONE;
transactionTag = null;
excludeTxnFromChangeStreams = false;
} else {
popUnitOfWorkFromTransactionStack();
}
Expand Down Expand Up @@ -1768,22 +1789,29 @@ UnitOfWork createNewUnitOfWork(
if (isInternalMetadataQuery
|| (isAutocommit() && !isInTransaction() && !isInBatch())
|| forceSingleUse) {
return SingleUseTransaction.newBuilder()
.setInternalMetadataQuery(isInternalMetadataQuery)
.setDdlClient(ddlClient)
.setDatabaseClient(dbClient)
.setBatchClient(batchClient)
.setReadOnly(isReadOnly())
.setReadOnlyStaleness(readOnlyStaleness)
.setAutocommitDmlMode(autocommitDmlMode)
.setReturnCommitStats(returnCommitStats)
.setMaxCommitDelay(maxCommitDelay)
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setSpan(
createSpanForUnitOfWork(
statementType == StatementType.DDL ? DDL_STATEMENT : SINGLE_USE_TRANSACTION))
.build();
SingleUseTransaction singleUseTransaction =
SingleUseTransaction.newBuilder()
.setInternalMetadataQuery(isInternalMetadataQuery)
.setDdlClient(ddlClient)
.setDatabaseClient(dbClient)
.setBatchClient(batchClient)
.setReadOnly(isReadOnly())
.setReadOnlyStaleness(readOnlyStaleness)
.setAutocommitDmlMode(autocommitDmlMode)
.setReturnCommitStats(returnCommitStats)
.setExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams)
.setMaxCommitDelay(maxCommitDelay)
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setSpan(
createSpanForUnitOfWork(
statementType == StatementType.DDL ? DDL_STATEMENT : SINGLE_USE_TRANSACTION))
.build();
if (!isInternalMetadataQuery && !forceSingleUse) {
// Reset the transaction options after starting a single-use transaction.
setDefaultTransactionOptions();
}
return singleUseTransaction;
} else {
switch (getUnitOfWorkType()) {
case READ_ONLY_TRANSACTION:
Expand All @@ -1810,6 +1838,7 @@ UnitOfWork createNewUnitOfWork(
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setTransactionTag(transactionTag)
.setExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams)
.setRpcPriority(rpcPriority)
.setSpan(createSpanForUnitOfWork(READ_WRITE_TRANSACTION))
.build();
Expand All @@ -1822,6 +1851,7 @@ UnitOfWork createNewUnitOfWork(
.setStatementTimeout(statementTimeout)
.withStatementExecutor(statementExecutor)
.setStatementTag(statementTag)
.setExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams)
.setRpcPriority(rpcPriority)
.setSpan(createSpanForUnitOfWork(DML_BATCH))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ StatementResult statementSetDelayTransactionStartUntilFirstWrite(

StatementResult statementShowTransactionTag();

StatementResult statementSetExcludeTxnFromChangeStreams(Boolean excludeTxnFromChangeStreams);

StatementResult statementShowExcludeTxnFromChangeStreams();

StatementResult statementBeginTransaction();

StatementResult statementBeginPgTransaction(PgTransactionMode transactionMode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_DEFAULT_TRANSACTION_ISOLATION;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_DIRECTED_READ;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_EXCLUDE_TXN_FROM_CHANGE_STREAMS;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_MAX_COMMIT_DELAY;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_MAX_PARTITIONED_PARALLELISM;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SET_MAX_PARTITIONS;
Expand All @@ -52,6 +53,7 @@
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_DATA_BOOST_ENABLED;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_DELAY_TRANSACTION_START_UNTIL_FIRST_WRITE;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_DIRECTED_READ;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_EXCLUDE_TXN_FROM_CHANGE_STREAMS;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_MAX_COMMIT_DELAY;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_MAX_PARTITIONED_PARALLELISM;
import static com.google.cloud.spanner.connection.StatementResult.ClientSideStatementType.SHOW_MAX_PARTITIONS;
Expand Down Expand Up @@ -409,6 +411,21 @@ public StatementResult statementShowTransactionTag() {
SHOW_TRANSACTION_TAG);
}

@Override
public StatementResult statementSetExcludeTxnFromChangeStreams(
Boolean excludeTxnFromChangeStreams) {
getConnection().setExcludeTxnFromChangeStreams(excludeTxnFromChangeStreams);
return noResult(SET_EXCLUDE_TXN_FROM_CHANGE_STREAMS);
}

@Override
public StatementResult statementShowExcludeTxnFromChangeStreams() {
return resultSet(
String.format("%sEXCLUDE_TXN_FROM_CHANGE_STREAMS", getNamespace(connection.getDialect())),
getConnection().isExcludeTxnFromChangeStreams(),
SHOW_EXCLUDE_TXN_FROM_CHANGE_STREAMS);
}

@Override
public StatementResult statementBeginTransaction() {
getConnection().beginTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ private TransactionOption[] extractOptions(Builder builder) {
if (this.transactionTag != null) {
numOptions++;
}
if (this.excludeTxnFromChangeStreams) {
numOptions++;
}
if (this.rpcPriority != null) {
numOptions++;
}
Expand All @@ -265,6 +268,9 @@ private TransactionOption[] extractOptions(Builder builder) {
if (this.transactionTag != null) {
options[index++] = Options.tag(this.transactionTag);
}
if (this.excludeTxnFromChangeStreams) {
options[index++] = Options.excludeTxnFromChangeStreams();
}
if (this.rpcPriority != null) {
options[index++] = Options.priority(this.rpcPriority);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import com.google.spanner.v1.SpannerGrpc;
import io.opentelemetry.context.Scope;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.Callable;

/**
Expand Down Expand Up @@ -497,6 +498,9 @@ private TransactionRunner createWriteTransaction() {
if (returnCommitStats) {
numOptions++;
}
if (excludeTxnFromChangeStreams) {
numOptions++;
}
if (maxCommitDelay != null) {
numOptions++;
}
Expand All @@ -511,6 +515,9 @@ private TransactionRunner createWriteTransaction() {
if (returnCommitStats) {
options[index++] = Options.commitStats();
}
if (excludeTxnFromChangeStreams) {
options[index++] = Options.excludeTxnFromChangeStreams();
}
if (maxCommitDelay != null) {
options[index++] = Options.maxCommitDelay(maxCommitDelay);
}
Expand Down Expand Up @@ -580,10 +587,21 @@ private ApiFuture<ResultSet> analyzeTransactionalUpdateAsync(

private ApiFuture<Long> executePartitionedUpdateAsync(
CallType callType, final ParsedStatement update, final UpdateOption... options) {
final UpdateOption[] effectiveOptions;
if (excludeTxnFromChangeStreams) {
if (options.length == 0) {
effectiveOptions = new UpdateOption[] {Options.excludeTxnFromChangeStreams()};
} else {
effectiveOptions = Arrays.copyOf(options, options.length + 1);
effectiveOptions[effectiveOptions.length - 1] = Options.excludeTxnFromChangeStreams();
}
} else {
effectiveOptions = options;
}
Callable<Long> callable =
() -> {
try {
Long res = dbClient.executePartitionedUpdate(update.getStatement(), options);
Long res = dbClient.executePartitionedUpdate(update.getStatement(), effectiveOptions);
state = UnitOfWorkState.COMMITTED;
return res;
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ enum ClientSideStatementType {
SET_STATEMENT_TAG,
SHOW_TRANSACTION_TAG,
SET_TRANSACTION_TAG,
SHOW_EXCLUDE_TXN_FROM_CHANGE_STREAMS,
SET_EXCLUDE_TXN_FROM_CHANGE_STREAMS,
BEGIN,
COMMIT,
ROLLBACK,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@
"method": "statementShowTransactionTag",
"exampleStatements": ["show variable transaction_tag"]
},
{
"name": "SHOW VARIABLE EXCLUDE_TXN_FROM_CHANGE_STREAMS",
"executorName": "ClientSideStatementNoParamExecutor",
"resultType": "RESULT_SET",
"statementType": "SHOW_EXCLUDE_TXN_FROM_CHANGE_STREAMS",
"regex": "(?is)\\A\\s*show\\s+variable\\s+exclude_txn_from_change_streams\\s*\\z",
"method": "statementShowExcludeTxnFromChangeStreams",
"exampleStatements": ["show variable exclude_txn_from_change_streams"]
},
{
"name": "SHOW VARIABLE RPC_PRIORITY",
"executorName": "ClientSideStatementNoParamExecutor",
Expand Down Expand Up @@ -497,6 +506,21 @@
"converterName": "ClientSideStatementValueConverters$StringValueConverter"
}
},
{
"name": "SET EXCLUDE_TXN_FROM_CHANGE_STREAMS = TRUE|FALSE",
"executorName": "ClientSideStatementSetExecutor",
"resultType": "NO_RESULT",
"statementType": "SET_EXCLUDE_TXN_FROM_CHANGE_STREAMS",
"regex": "(?is)\\A\\s*set\\s+exclude_txn_from_change_streams\\s*(?:=)\\s*(.*)\\z",
"method": "statementSetExcludeTxnFromChangeStreams",
"exampleStatements": ["set exclude_txn_from_change_streams = true", "set exclude_txn_from_change_streams = false"],
"setStatement": {
"propertyName": "EXCLUDE_TXN_FROM_CHANGE_STREAMS",
"separator": "=",
"allowedValues": "(TRUE|FALSE)",
"converterName": "ClientSideStatementValueConverters$BooleanConverter"
}
},
{
"name": "SET RPC_PRIORITY = 'HIGH'|'MEDIUM'|'LOW'|'NULL'",
"executorName": "ClientSideStatementSetExecutor",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,15 @@
"method": "statementShowTransactionTag",
"exampleStatements": ["show spanner.transaction_tag","show variable spanner.transaction_tag"]
},
{
"name": "SHOW [VARIABLE] SPANNER.EXCLUDE_TXN_FROM_CHANGE_STREAMS",
"executorName": "ClientSideStatementNoParamExecutor",
"resultType": "RESULT_SET",
"statementType": "SHOW_EXCLUDE_TXN_FROM_CHANGE_STREAMS",
"regex": "(?is)\\A\\s*show\\s+(?:variable\\s+)?spanner\\.exclude_txn_from_change_streams\\s*\\z",
"method": "statementShowExcludeTxnFromChangeStreams",
"exampleStatements": ["show spanner.exclude_txn_from_change_streams","show variable spanner.exclude_txn_from_change_streams"]
},
{
"name": "SHOW [VARIABLE] SPANNER.RPC_PRIORITY",
"executorName": "ClientSideStatementNoParamExecutor",
Expand Down Expand Up @@ -679,6 +688,21 @@
"converterName": "ClientSideStatementValueConverters$StringValueConverter"
}
},
{
"name": "SET SPANNER.EXCLUDE_TXN_FROM_CHANGE_STREAMS =|TO TRUE|FALSE",
"executorName": "ClientSideStatementSetExecutor",
"resultType": "NO_RESULT",
"statementType": "SET_EXCLUDE_TXN_FROM_CHANGE_STREAMS",
"regex": "(?is)\\A\\s*set\\s+spanner\\.exclude_txn_from_change_streams(?:\\s*=\\s*|\\s+to\\s+)(.*)\\z",
"method": "statementSetReturnCommitStats",
"exampleStatements": ["set spanner.exclude_txn_from_change_streams = true", "set spanner.exclude_txn_from_change_streams = false", "set spanner.exclude_txn_from_change_streams to true", "set spanner.exclude_txn_from_change_streams to false"],
"setStatement": {
"propertyName": "SPANNER.EXCLUDE_TXN_FROM_CHANGE_STREAMS",
"separator": "(?:=|\\s+TO\\s+)",
"allowedValues": "(TRUE|FALSE)",
"converterName": "ClientSideStatementValueConverters$BooleanConverter"
}
},
{
"name": "SET SPANNER.RPC_PRIORITY =|TO 'HIGH'|'MEDIUM'|'LOW'|'NULL'",
"executorName": "ClientSideStatementSetExecutor",
Expand Down
Loading
Loading