Skip to content

Commit d6e8b7a

Browse files
committed
docs(samples): add samples and tests for change streams txn exclusion
1 parent e4ee19d commit d6e8b7a

File tree

3 files changed

+270
-1
lines changed

3 files changed

+270
-1
lines changed

samples/snippets/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
<dependency>
3535
<groupId>com.google.cloud</groupId>
3636
<artifactId>libraries-bom</artifactId>
37-
<version>26.37.0</version>
37+
<version>26.38.0</version>
3838
<type>pom</type>
3939
<scope>import</scope>
4040
</dependency>
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
/*
2+
* Copyright 2021 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.spanner;
18+
19+
import com.google.cloud.spanner.CommitResponse;
20+
import com.google.cloud.spanner.DatabaseClient;
21+
import com.google.cloud.spanner.DatabaseId;
22+
import com.google.cloud.spanner.Mutation;
23+
import com.google.cloud.spanner.Options;
24+
import com.google.cloud.spanner.Spanner;
25+
import com.google.cloud.spanner.SpannerOptions;
26+
import com.google.cloud.spanner.Statement;
27+
import com.google.cloud.spanner.TransactionContext;
28+
import com.google.cloud.spanner.TransactionManager;
29+
import java.util.Collections;
30+
import com.google.spanner.v1.BatchWriteResponse;
31+
import com.google.api.gax.rpc.ServerStream;
32+
import com.google.rpc.Code;
33+
import com.google.cloud.spanner.MutationGroup;
34+
import com.google.common.collect.ImmutableList;
35+
36+
/** Sample showing how to set exclude transaction from change streams in different write requests. */
37+
public class ChangeStreamsTxnExclusionSample {
38+
39+
static void setExcludeTxnFromChangeStreams() {
40+
// TODO(developer): Replace these variables before running the sample.
41+
final String projectId = "span-cloud-testing";
42+
final String instanceId = "weideng-test";
43+
final String databaseId = "my-database";
44+
45+
try (Spanner spanner =
46+
SpannerOptions.newBuilder().setProjectId(projectId).build().getService()) {
47+
final DatabaseClient databaseClient =
48+
spanner.getDatabaseClient(DatabaseId.of(projectId, instanceId, databaseId));
49+
rwTxnExcludedFromChangeStreams(databaseClient);
50+
}
51+
}
52+
53+
// [START spanner_set_exclude_txn_from_change_streams]
54+
static void rwTxnExcludedFromChangeStreams(DatabaseClient client) {
55+
// Exclude the transaction from allowed tracking change streams with alloww_txn_exclusion=true.
56+
// This exclusion will be applied to all the individual operations inside this transaction.
57+
client
58+
.readWriteTransaction(Options.excludeTxnFromChangeStreams())
59+
.run(
60+
transaction -> {
61+
transaction.executeUpdate(
62+
Statement.of(
63+
"INSERT Singers (SingerId, FirstName, LastName)\n"
64+
+ "VALUES (1341, 'Virginia', 'Watson')"));
65+
System.out.println("New singer inserted.");
66+
67+
transaction.executeUpdate(
68+
Statement.of("UPDATE Singers SET FirstName = 'Hi' WHERE SingerId = 111"));
69+
System.out.println("Singer first name updated.");
70+
71+
return null;
72+
});
73+
}
74+
75+
static void writeExcludedFromChangeStreams(DatabaseClient client) {
76+
CommitResponse response =
77+
client.writeWithOptions(
78+
Collections.singletonList(
79+
Mutation.newInsertOrUpdateBuilder("Singers")
80+
.set("SingerId")
81+
.to(4520)
82+
.set("FirstName")
83+
.to("Lauren")
84+
.set("LastName")
85+
.to("Lee")
86+
.build()),
87+
Options.excludeTxnFromChangeStreams());
88+
System.out.println("New singer inserted.");
89+
}
90+
91+
static void writeAtLeastOnceExcludedFromChangeStreams(DatabaseClient client) {
92+
CommitResponse response =
93+
client.writeAtLeastOnceWithOptions(
94+
Collections.singletonList(
95+
Mutation.newInsertOrUpdateBuilder("Singers")
96+
.set("SingerId")
97+
.to(45201)
98+
.set("FirstName")
99+
.to("Laura")
100+
.set("LastName")
101+
.to("Johnson")
102+
.build()),
103+
Options.excludeTxnFromChangeStreams());
104+
System.out.println("New singer inserted.");
105+
}
106+
107+
static void batchWriteAtLeastOnceExcludedFromChangeStreams(DatabaseClient client) {
108+
ServerStream<BatchWriteResponse> responses =
109+
client.batchWriteAtLeastOnce(
110+
ImmutableList.of(MutationGroup.of(
111+
Mutation.newInsertOrUpdateBuilder("Singers")
112+
.set("SingerId")
113+
.to(116)
114+
.set("FirstName")
115+
.to("Scarlet")
116+
.set("LastName")
117+
.to("Terry")
118+
.build())),
119+
Options.excludeTxnFromChangeStreams());
120+
for (BatchWriteResponse response : responses) {
121+
if (response.getStatus().getCode() != Code.OK_VALUE) {
122+
System.out.printf(
123+
"Mutation group could not be applied with error code %s and "
124+
+ "error message %s",
125+
Code.forNumber(response.getStatus().getCode()), response.getStatus().getMessage());
126+
}
127+
}
128+
System.out.println("New singer inserted.");
129+
}
130+
131+
static void pdmlExcludedFromChangeStreams(DatabaseClient client) {
132+
client.executePartitionedUpdate(
133+
Statement.of("DELETE FROM Singers WHERE TRUE"), Options.excludeTxnFromChangeStreams());
134+
System.out.println("Singers deleted.");
135+
}
136+
137+
static void txnManagerExcludedFromChangeStreams(DatabaseClient client) {
138+
try (TransactionManager manager = client.transactionManager(Options.excludeTxnFromChangeStreams())) {
139+
TransactionContext transaction = manager.begin();
140+
transaction.buffer(
141+
Mutation.newInsertOrUpdateBuilder("Singers")
142+
.set("SingerId")
143+
.to(888)
144+
.set("FirstName")
145+
.to("Johnson")
146+
.set("LastName")
147+
.to("Doug")
148+
.build());
149+
manager.commit();
150+
System.out.println("New singer inserted.");
151+
}
152+
}
153+
154+
// [END spanner_set_exclude_txn_from_change_streams]
155+
156+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.example.spanner;
18+
19+
import static com.example.spanner.SampleRunner.runSample;
20+
import static com.google.common.truth.Truth.assertThat;
21+
22+
import com.google.cloud.spanner.DatabaseClient;
23+
import com.google.cloud.spanner.DatabaseId;
24+
import com.google.cloud.spanner.KeySet;
25+
import com.google.cloud.spanner.Mutation;
26+
import com.google.common.collect.ImmutableList;
27+
import java.util.Arrays;
28+
import java.util.Collections;
29+
import org.junit.After;
30+
import org.junit.Before;
31+
import org.junit.BeforeClass;
32+
import org.junit.Test;
33+
import org.junit.runner.RunWith;
34+
import org.junit.runners.JUnit4;
35+
36+
/**
37+
* Integration tests for {@link ChangeStreamsTxnExclusionSample}
38+
*/
39+
@RunWith(JUnit4.class)
40+
public class ChangeStreamsTxnExclusionSampleIT extends SampleTestBase {
41+
42+
private static DatabaseId databaseId;
43+
44+
@BeforeClass
45+
public static void createTestDatabase() throws Exception {
46+
final String database = idGenerator.generateDatabaseId();
47+
databaseAdminClient
48+
.createDatabase(
49+
instanceId,
50+
database,
51+
ImmutableList.of(
52+
"CREATE TABLE Singers ("
53+
+ " SingerId INT64 NOT NULL,"
54+
+ " FirstName STRING(1024),"
55+
+ " LastName STRING(1024),"
56+
+ " SingerInfo BYTES(MAX)"
57+
+ ") PRIMARY KEY (SingerId)"))
58+
.get();
59+
databaseId = DatabaseId.of(projectId, instanceId, database);
60+
}
61+
62+
@Before
63+
public void insertTestData() {
64+
final DatabaseClient client = spanner.getDatabaseClient(databaseId);
65+
client.write(Arrays.asList(
66+
Mutation.newInsertBuilder("Singers")
67+
.set("SingerId")
68+
.to(1L)
69+
.set("FirstName")
70+
.to("first name 1")
71+
.set("LastName")
72+
.to("last name 1")
73+
.build(),
74+
Mutation.newInsertBuilder("Singers")
75+
.set("SingerId")
76+
.to(2L)
77+
.set("FirstName")
78+
.to("first name 2")
79+
.set("LastName")
80+
.to("last name 2")
81+
.build()
82+
));
83+
}
84+
85+
@After
86+
public void removeTestData() {
87+
final DatabaseClient client = spanner.getDatabaseClient(databaseId);
88+
client.write(Collections.singletonList(Mutation.delete("Singers", KeySet.all())));
89+
}
90+
91+
@Test
92+
public void testSetExcludeTxnFromChangeStreamsSampleSample() throws Exception {
93+
final DatabaseClient client = spanner.getDatabaseClient(databaseId);
94+
String out = runSample(() -> ChangeStreamsTxnExclusionSample.rwTxnExcludedFromChangeStreams(client));
95+
assertThat(out).contains("New singer inserted.");
96+
assertThat(out).contains("Singer first name updated.");
97+
98+
out = runSample(() -> ChangeStreamsTxnExclusionSample.writeExcludedFromChangeStreams(client));
99+
assertThat(out).contains("New singer inserted.");
100+
101+
out = runSample(() -> ChangeStreamsTxnExclusionSample.writeAtLeastOnceExcludedFromChangeStreams(client));
102+
assertThat(out).contains("New singer inserted.");
103+
104+
out = runSample(() -> ChangeStreamsTxnExclusionSample.batchWriteAtLeastOnceExcludedFromChangeStreams(client));
105+
assertThat(out).contains("New singer inserted.");
106+
107+
out = runSample(() -> ChangeStreamsTxnExclusionSample.pdmlExcludedFromChangeStreams(client));
108+
assertThat(out).contains("Singers deleted.");
109+
110+
out = runSample(() -> ChangeStreamsTxnExclusionSample.txnManagerExcludedFromChangeStreams(client));
111+
assertThat(out).contains("New singer inserted.");
112+
}
113+
}

0 commit comments

Comments
 (0)