Skip to content

Commit 6f03fab

Browse files
authored
test(fix): testing the serializability behaviour without relying on the underlying concurrency (#1079)
fix #497 Since we are testing the behaviour _(i.e. if two transactions conflict with each other, the database guarantees that only one can commit successfully at a time)_ now, both of the tests are now agnostic of underlying concurrency mechanism. This makes sure that both of the tests continue to pass in any gcloud project .
1 parent 2fca5c8 commit 6f03fab

File tree

2 files changed

+155
-38
lines changed

2 files changed

+155
-38
lines changed

google-cloud-datastore/src/test/java/com/google/cloud/datastore/it/ITDatastoreTest.java

Lines changed: 64 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import static org.junit.Assert.fail;
3030

3131
import com.google.cloud.Timestamp;
32+
import com.google.cloud.Tuple;
3233
import com.google.cloud.datastore.AggregationQuery;
3334
import com.google.cloud.datastore.Batch;
3435
import com.google.cloud.datastore.BooleanValue;
@@ -335,57 +336,82 @@ public void testNewTransactionCommit() {
335336
}
336337

337338
@Test
338-
public void testTransactionWithRead() {
339-
Transaction transaction = DATASTORE.newTransaction();
340-
assertNull(transaction.get(KEY3));
341-
transaction.add(ENTITY3);
342-
transaction.commit();
339+
public void testTransactionWithRead() throws Exception {
340+
StatementExecutor statementExecutor = new StatementExecutor();
341+
Transaction baseTransaction = DATASTORE.newTransaction();
342+
assertNull(baseTransaction.get(KEY3));
343+
baseTransaction.add(ENTITY3);
344+
baseTransaction.commit();
343345
assertEquals(ENTITY3, DATASTORE.get(KEY3));
344346

345-
transaction = DATASTORE.newTransaction();
346-
assertEquals(ENTITY3, transaction.get(KEY3));
347-
// update entity3 during the transaction
348-
DATASTORE.put(Entity.newBuilder(ENTITY2).clear().set("from", "datastore").build());
349-
transaction.update(Entity.newBuilder(ENTITY2).clear().set("from", "transaction").build());
350-
try {
351-
transaction.commit();
352-
fail("Expecting a failure");
353-
} catch (DatastoreException expected) {
354-
assertEquals("ABORTED", expected.getReason());
355-
}
347+
Transaction transaction = DATASTORE.newTransaction();
348+
statementExecutor.execute(
349+
Tuple.of("T1", () -> assertEquals(ENTITY3, transaction.get(KEY3))),
350+
// update entity3 during the transaction, will be blocked in case of pessimistic concurrency
351+
Tuple.of(
352+
"T2",
353+
() ->
354+
DATASTORE.put(Entity.newBuilder(ENTITY3).clear().set("from", "datastore").build())),
355+
Tuple.of(
356+
"T1",
357+
() ->
358+
transaction.update(
359+
Entity.newBuilder(ENTITY3).clear().set("from", "transaction").build())),
360+
Tuple.of("T1", transaction::commit) // T1 will throw error in case of optimistic concurrency
361+
);
362+
363+
boolean t1AllPassed = statementExecutor.didAllPass("T1");
364+
boolean t2AllPassed = statementExecutor.didAllPass("T2");
365+
// If two transactions conflict with each other, the database guarantees that only
366+
// one can commit successfully at a time. Please refer to StatementExecutor class for more info.
367+
// Using XOR to ensure that only one of transaction group is successful,
368+
boolean onlyOneTransactionIsSuccessful = t1AllPassed ^ t2AllPassed;
369+
370+
assertThat(onlyOneTransactionIsSuccessful).isTrue();
356371
}
357372

358373
@Test
359-
public void testTransactionWithQuery() {
374+
public void testTransactionWithQuery() throws Exception {
375+
StatementExecutor statementExecutor = new StatementExecutor();
360376
Query<Entity> query =
361377
Query.newEntityQueryBuilder()
362378
.setKind(KIND2)
363379
.setFilter(PropertyFilter.hasAncestor(KEY2))
364380
.setNamespace(NAMESPACE)
365381
.build();
366-
Transaction transaction = DATASTORE.newTransaction();
367-
QueryResults<Entity> results = transaction.run(query);
368-
assertTrue(results.hasNext());
369-
assertEquals(ENTITY2, results.next());
370-
assertFalse(results.hasNext());
371-
transaction.add(ENTITY3);
372-
transaction.commit();
382+
Transaction baseTransaction = DATASTORE.newTransaction();
383+
QueryResults<Entity> baseResults = baseTransaction.run(query);
384+
assertTrue(baseResults.hasNext());
385+
assertEquals(ENTITY2, baseResults.next());
386+
assertFalse(baseResults.hasNext());
387+
baseTransaction.add(ENTITY3);
388+
baseTransaction.commit();
373389
assertEquals(ENTITY3, DATASTORE.get(KEY3));
374390

375-
transaction = DATASTORE.newTransaction();
376-
results = transaction.run(query);
377-
assertTrue(results.hasNext());
378-
assertEquals(ENTITY2, results.next());
379-
assertFalse(results.hasNext());
380-
transaction.delete(ENTITY3.getKey());
381-
// update entity2 during the transaction
382-
DATASTORE.put(Entity.newBuilder(ENTITY2).clear().build());
383-
try {
384-
transaction.commit();
385-
fail("Expecting a failure");
386-
} catch (DatastoreException expected) {
387-
assertEquals("ABORTED", expected.getReason());
388-
}
391+
Transaction transaction = DATASTORE.newTransaction();
392+
statementExecutor.execute(
393+
Tuple.of(
394+
"T1",
395+
() -> {
396+
QueryResults<Entity> results = transaction.run(query);
397+
assertTrue(results.hasNext());
398+
assertEquals(ENTITY2, results.next());
399+
assertFalse(results.hasNext());
400+
}),
401+
Tuple.of("T1", () -> transaction.delete(ENTITY3.getKey())),
402+
// update entity2 during the transaction, will be blocked in case of pessimistic concurrency
403+
Tuple.of("T2", () -> DATASTORE.put(Entity.newBuilder(ENTITY2).clear().build())),
404+
Tuple.of("T1", transaction::commit) // T1 will throw error in case of optimistic concurrency
405+
);
406+
407+
boolean t1AllPassed = statementExecutor.didAllPass("T1");
408+
boolean t2AllPassed = statementExecutor.didAllPass("T2");
409+
// If two transactions conflict with each other, the database guarantees that only
410+
// one can commit successfully at a time. Please refer to StatementExecutor class for more info.
411+
// Using XOR to ensure that only one of transaction group is successful,
412+
boolean onlyOneTransactionIsSuccessful = t1AllPassed ^ t2AllPassed;
413+
414+
assertThat(onlyOneTransactionIsSuccessful).isTrue();
389415
}
390416

391417
@Test
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.datastore.it;
18+
19+
import static java.util.concurrent.TimeUnit.SECONDS;
20+
21+
import com.google.cloud.Tuple;
22+
import com.google.cloud.datastore.DatastoreException;
23+
import com.google.common.collect.ArrayListMultimap;
24+
import com.google.common.collect.Multimap;
25+
import java.util.concurrent.ExecutionException;
26+
import java.util.concurrent.ExecutorService;
27+
import java.util.concurrent.Executors;
28+
import java.util.concurrent.Future;
29+
import java.util.concurrent.TimeoutException;
30+
31+
/**
32+
* An executor class to handle interleaved transactions.
33+
*
34+
* <p>It executes statements (under multiple transactions) and record their failures under a groupId
35+
* provided by users.
36+
*/
37+
class StatementExecutor {
38+
39+
private final Multimap<String, Exception> failures = ArrayListMultimap.create();
40+
41+
/**
42+
* Executes a list of {@link Statement} one by one and record their failures under the groupId. In
43+
* case of pessimistic concurrency, a statement will be blocked and cause delay until another
44+
* transaction which was started earlier is committed. In case of optimistic concurrency, both
45+
* transaction can perform their operation simultaneously, but the one which commits first will be
46+
* a winner and other one will get an error on commit operation indicating a need for retry.
47+
*
48+
* @param tuples A {@link Statement(String, String) Tuple(&lt;String, Statement&gt;)} has a
49+
* groupId of {@link String} type and a {@link Statement} to execute.
50+
*/
51+
@SafeVarargs
52+
final void execute(Tuple<String, Statement>... tuples) throws Exception {
53+
ExecutorService executorService = Executors.newSingleThreadExecutor();
54+
for (Tuple<String, Statement> tuple : tuples) {
55+
String groupId = tuple.x();
56+
Statement statement = tuple.y();
57+
Future<?> future = executorService.submit(statement::execute);
58+
try {
59+
// waiting for statement to execute
60+
future.get(10, SECONDS);
61+
} catch (Exception exception) {
62+
future.cancel(true);
63+
if (transactionConflict(exception)) {
64+
failures.put(groupId, exception);
65+
} else {
66+
throw exception;
67+
}
68+
}
69+
}
70+
executorService.shutdown();
71+
}
72+
73+
boolean didAllPass(String groupId) {
74+
return failures.get(groupId).isEmpty();
75+
}
76+
77+
private boolean transactionConflict(Exception exception) {
78+
if (exception instanceof TimeoutException) { // timed out coz of pessimistic concurrency delay
79+
return true;
80+
}
81+
return exception instanceof ExecutionException
82+
&& exception.getCause().getClass() == DatastoreException.class
83+
&& exception
84+
.getMessage()
85+
.contains("contention"); // exception raise coz of optimistic concurrency
86+
}
87+
88+
interface Statement {
89+
void execute();
90+
}
91+
}

0 commit comments

Comments
 (0)