Skip to content

Commit ce48177

Browse files
authored
[DE-131] Bugfix activefailover concurrency (#423)
* using futures in queueTime tests * added concurrency tests * CI: print AF leader * AF: check host before closing * AF: keep failed host with pending requests * tests shutdown
1 parent 8902892 commit ce48177

File tree

12 files changed

+140
-12
lines changed

12 files changed

+140
-12
lines changed

docker/start_db.sh

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,9 @@ for a in ${COORDINATORS[*]} ; do
9696
echo "$SCHEME://$a"
9797
echo ""
9898
done
99+
100+
if [ "$STARTER_MODE" == "activefailover" ]; then
101+
LEADER=$("$LOCATION"/find_active_endpoint.sh)
102+
echo "Leader: $SCHEME://$LEADER"
103+
echo ""
104+
fi

src/main/java/com/arangodb/async/internal/velocystream/VstCommunicationAsync.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,7 @@ protected CompletableFuture<Response> execute(final Request request, final VstCo
8585
}
8686
final String location = e.getLocation();
8787
final HostDescription redirectHost = HostUtils.createFromLocation(location);
88-
hostHandler.closeCurrentOnError();
89-
hostHandler.fail(e);
88+
hostHandler.failIfNotMatch(redirectHost, e);
9089
execute(request, new HostHandle().setHost(redirectHost), attemptCount + 1)
9190
.whenComplete((v, err) -> {
9291
if (v != null) {

src/main/java/com/arangodb/internal/http/HttpCommunication.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,7 @@ private Response execute(final Request request, final HostHandle hostHandle, fin
106106
if (e instanceof ArangoDBRedirectException && attemptCount < 3) {
107107
final String location = ((ArangoDBRedirectException) e).getLocation();
108108
final HostDescription redirectHost = HostUtils.createFromLocation(location);
109-
hostHandler.closeCurrentOnError();
110-
hostHandler.fail(e);
109+
hostHandler.failIfNotMatch(redirectHost, e);
111110
return execute(request, new HostHandle().setHost(redirectHost), attemptCount + 1);
112111
} else {
113112
throw e;

src/main/java/com/arangodb/internal/net/DirtyReadHostHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,11 @@ public void fail(Exception exception) {
6060
determineHostHandler().fail(exception);
6161
}
6262

63+
@Override
64+
public void failIfNotMatch(HostDescription host, Exception exception) {
65+
determineHostHandler().failIfNotMatch(host, exception);
66+
}
67+
6368
@Override
6469
public void reset() {
6570
determineHostHandler().reset();
@@ -81,6 +86,11 @@ public void closeCurrentOnError() {
8186
determineHostHandler().closeCurrentOnError();
8287
}
8388

89+
@Override
90+
public void closeCurrentOnErrorIfNotMatch(HostDescription host) {
91+
determineHostHandler().closeCurrentOnErrorIfNotMatch(host);
92+
}
93+
8494
@Override
8595
public void setJwt(String jwt) {
8696
master.setJwt(jwt);

src/main/java/com/arangodb/internal/net/FallbackHostHandler.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,13 @@ public void fail(Exception exception) {
7979
lastFailExceptions.add(exception);
8080
}
8181

82+
@Override
83+
public synchronized void failIfNotMatch(HostDescription host, Exception exception) {
84+
if (!host.equals(current.getDescription())) {
85+
fail(exception);
86+
}
87+
}
88+
8289
@Override
8390
public void reset() {
8491
iterations = 0;
@@ -104,6 +111,13 @@ public void closeCurrentOnError() {
104111
current.closeOnError();
105112
}
106113

114+
@Override
115+
public synchronized void closeCurrentOnErrorIfNotMatch(HostDescription host) {
116+
if (!host.equals(current.getDescription())) {
117+
closeCurrentOnError();
118+
}
119+
}
120+
107121
@Override
108122
public void setJwt(String jwt) {
109123
hosts.setJwt(jwt);

src/main/java/com/arangodb/internal/net/HostHandler.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ public interface HostHandler {
3333

3434
void fail(Exception exception);
3535

36+
void failIfNotMatch(HostDescription host, Exception exception);
37+
3638
void reset();
3739

3840
void confirm();
@@ -41,6 +43,8 @@ public interface HostHandler {
4143

4244
void closeCurrentOnError();
4345

46+
void closeCurrentOnErrorIfNotMatch(HostDescription host);
47+
4448
void setJwt(String jwt);
4549

4650
}

src/main/java/com/arangodb/internal/net/RandomHostHandler.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,13 @@ public void fail(Exception exception) {
5959
current = fallback.get(null, null);
6060
}
6161

62+
@Override
63+
public synchronized void failIfNotMatch(HostDescription host, Exception exception) {
64+
if (!host.equals(current.getDescription())) {
65+
fail(exception);
66+
}
67+
}
68+
6269
private Host getRandomHost(final boolean initial, final boolean closeConnections) {
6370
hosts = resolver.resolve(initial, closeConnections);
6471
final ArrayList<Host> hostList = new ArrayList<>(hosts.getHostsList());
@@ -85,6 +92,13 @@ public void closeCurrentOnError() {
8592
current.closeOnError();
8693
}
8794

95+
@Override
96+
public synchronized void closeCurrentOnErrorIfNotMatch(HostDescription host) {
97+
if (!host.equals(current.getDescription())) {
98+
closeCurrentOnError();
99+
}
100+
}
101+
88102
@Override
89103
public void setJwt(String jwt) {
90104
fallback.setJwt(jwt);

src/main/java/com/arangodb/internal/net/RoundRobinHostHandler.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,11 @@ public void fail(Exception exception) {
8989
lastFailExceptions.add(exception);
9090
}
9191

92+
@Override
93+
public void failIfNotMatch(HostDescription host, Exception exception) {
94+
fail(exception);
95+
}
96+
9297
@Override
9398
public void reset() {
9499
fails = 0;
@@ -109,6 +114,11 @@ public void closeCurrentOnError() {
109114
currentHost.closeOnError();
110115
}
111116

117+
@Override
118+
public void closeCurrentOnErrorIfNotMatch(HostDescription host) {
119+
closeCurrentOnError();
120+
}
121+
112122
@Override
113123
public void setJwt(String jwt) {
114124
hosts.setJwt(jwt);

src/main/java/com/arangodb/internal/velocystream/VstCommunicationSync.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -151,8 +151,7 @@ protected Response execute(final Request request, final VstConnectionSync connec
151151
}
152152
final String location = e.getLocation();
153153
final HostDescription redirectHost = HostUtils.createFromLocation(location);
154-
hostHandler.closeCurrentOnError();
155-
hostHandler.fail(e);
154+
hostHandler.failIfNotMatch(redirectHost, e);
156155
return execute(request, new HostHandle().setHost(redirectHost), attemptCount + 1);
157156
}
158157
}

src/test/java/com/arangodb/ArangoDBTest.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,9 @@
4444
import java.util.Map;
4545
import java.util.Optional;
4646
import java.util.UUID;
47+
import java.util.concurrent.CompletableFuture;
48+
import java.util.concurrent.ExecutionException;
49+
import java.util.concurrent.Executors;
4750
import java.util.stream.Collectors;
4851
import java.util.stream.IntStream;
4952

@@ -725,13 +728,15 @@ public void accessMultipleDatabases() {
725728
}
726729

727730
@Test
728-
public void queueTime() throws InterruptedException {
729-
List<Thread> threads = IntStream.range(0, 80)
730-
.mapToObj(__ -> new Thread(() -> arangoDB.db().query("RETURN SLEEP(1)", Void.class)))
731+
public void queueTime() throws InterruptedException, ExecutionException {
732+
List<CompletableFuture<Void>> futures = IntStream.range(0, 80)
733+
.mapToObj(i -> CompletableFuture.runAsync(
734+
() -> arangoDB.db().query("RETURN SLEEP(1)", Void.class),
735+
Executors.newFixedThreadPool(80))
736+
)
731737
.collect(Collectors.toList());
732-
threads.forEach(Thread::start);
733-
for (Thread it : threads) {
734-
it.join();
738+
for (CompletableFuture<Void> f : futures) {
739+
f.get();
735740
}
736741

737742
QueueTimeMetrics qt = arangoDB.metrics().getQueueTime();
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package com.arangodb;
2+
3+
import org.junit.Test;
4+
import org.junit.runner.RunWith;
5+
import org.junit.runners.Parameterized;
6+
7+
import java.util.List;
8+
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.ExecutionException;
10+
import java.util.concurrent.Executors;
11+
import java.util.stream.Collectors;
12+
import java.util.stream.IntStream;
13+
14+
@RunWith(Parameterized.class)
15+
public class ConcurrencyTests {
16+
17+
final Protocol protocol;
18+
19+
public ConcurrencyTests(Protocol protocol) {
20+
this.protocol = protocol;
21+
}
22+
23+
@Parameterized.Parameters
24+
public static Protocol[] protocols() {
25+
return Protocol.values();
26+
}
27+
28+
@Test
29+
public void concurrentPendingRequests() throws ExecutionException, InterruptedException {
30+
ArangoDB adb = new ArangoDB.Builder().useProtocol(protocol).build();
31+
List<CompletableFuture<Void>> futures = IntStream.range(0, 10)
32+
.mapToObj(i -> CompletableFuture.runAsync(
33+
() -> adb.db().query("RETURN SLEEP(1)", Void.class),
34+
Executors.newFixedThreadPool(10))
35+
)
36+
.collect(Collectors.toList());
37+
for (CompletableFuture<Void> f : futures) {
38+
f.get();
39+
}
40+
adb.shutdown();
41+
}
42+
43+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package com.arangodb.async;
2+
3+
import org.junit.Test;
4+
5+
import java.util.List;
6+
import java.util.concurrent.CompletableFuture;
7+
import java.util.concurrent.ExecutionException;
8+
import java.util.stream.Collectors;
9+
import java.util.stream.IntStream;
10+
11+
public class ConcurrencyTests {
12+
13+
@Test
14+
public void concurrentPendingRequests() throws ExecutionException, InterruptedException {
15+
ArangoDBAsync adb = new ArangoDBAsync.Builder().build();
16+
List<CompletableFuture<ArangoCursorAsync<Void>>> reqs = IntStream.range(0, 10)
17+
.mapToObj(__ -> adb.db().query("RETURN SLEEP(1)", Void.class))
18+
.collect(Collectors.toList());
19+
for (CompletableFuture<ArangoCursorAsync<Void>> req : reqs) {
20+
req.get();
21+
}
22+
adb.shutdown();
23+
}
24+
25+
}

0 commit comments

Comments
 (0)