Skip to content

Commit 334ee13

Browse files
authored
[DE-1016] Async connection pool (#602)
* ConnectionLoadBalanceTest * added pipelining option * async connection pool * increase test waiting time * non-blocking AsyncQueue::offer() * refactoring AsyncQueue
1 parent 9b8e9c6 commit 334ee13

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+348
-96
lines changed

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<relativePath>../release-parent</relativePath>
99
<groupId>com.arangodb</groupId>
1010
<artifactId>release-parent</artifactId>
11-
<version>7.18.0</version>
11+
<version>7.19.0-SNAPSHOT</version>
1212
</parent>
1313

1414
<name>core</name>

core/src/main/java/com/arangodb/ArangoDB.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,17 @@ public Builder chunkSize(final Integer chunkSize) {
517517
return this;
518518
}
519519

520+
/**
521+
* Set whether to use requests pipelining in HTTP/1.1 ({@link Protocol#HTTP_JSON} or {@link Protocol#HTTP_VPACK}).
522+
*
523+
* @param pipelining {@code true} if enabled
524+
* @return {@link ArangoDB.Builder}
525+
*/
526+
public Builder pipelining(final Boolean pipelining) {
527+
config.setPipelining(pipelining);
528+
return this;
529+
}
530+
520531
/**
521532
* Sets the maximum number of connections the built in connection pool will open per host.
522533
*

core/src/main/java/com/arangodb/config/ArangoConfigProperties.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public interface ArangoConfigProperties {
2121
String KEY_USE_SSL = "useSsl";
2222
String KEY_VERIFY_HOST = "verifyHost";
2323
String KEY_CHUNK_SIZE = "chunkSize";
24+
String KEY_PIPELINING = "pipelining";
2425
String KEY_MAX_CONNECTIONS = "maxConnections";
2526
String KEY_CONNECTION_TTL = "connectionTtl";
2627
String KEY_KEEP_ALIVE_INTERVAL = "keepAliveInterval";
@@ -110,6 +111,10 @@ default Optional<Integer> getChunkSize() {
110111
return Optional.empty();
111112
}
112113

114+
default Optional<Boolean> getPipelining() {
115+
return Optional.empty();
116+
}
117+
113118
default Optional<Integer> getMaxConnections() {
114119
return Optional.empty();
115120
}

core/src/main/java/com/arangodb/internal/ArangoDefaults.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ public final class ArangoDefaults {
5050
public static final Boolean DEFAULT_USE_SSL = false;
5151
public static final Boolean DEFAULT_VERIFY_HOST = true;
5252
public static final Integer DEFAULT_CHUNK_SIZE = 30_000;
53+
public static final Boolean DEFAULT_PIPELINING = false;
5354
public static final Boolean DEFAULT_ACQUIRE_HOST_LIST = false;
5455
public static final Integer DEFAULT_ACQUIRE_HOST_LIST_INTERVAL = 60 * 60 * 1000; // hour
5556
public static final LoadBalancingStrategy DEFAULT_LOAD_BALANCING_STRATEGY = LoadBalancingStrategy.NONE;

core/src/main/java/com/arangodb/internal/config/ArangoConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class ArangoConfig {
3333
private SSLContext sslContext;
3434
private Boolean verifyHost;
3535
private Integer chunkSize;
36+
private Boolean pipelining;
3637
private Integer maxConnections;
3738
private Long connectionTtl;
3839
private Integer keepAliveInterval;
@@ -70,6 +71,7 @@ public void loadProperties(final ArangoConfigProperties properties) {
7071
useSsl = properties.getUseSsl().orElse(ArangoDefaults.DEFAULT_USE_SSL);
7172
verifyHost = properties.getVerifyHost().orElse(ArangoDefaults.DEFAULT_VERIFY_HOST);
7273
chunkSize = properties.getChunkSize().orElse(ArangoDefaults.DEFAULT_CHUNK_SIZE);
74+
pipelining = properties.getPipelining().orElse(ArangoDefaults.DEFAULT_PIPELINING);
7375
// FIXME: make maxConnections field Optional
7476
maxConnections = properties.getMaxConnections().orElse(null);
7577
// FIXME: make connectionTtl field Optional
@@ -173,6 +175,14 @@ public void setChunkSize(Integer chunkSize) {
173175
this.chunkSize = chunkSize;
174176
}
175177

178+
public Boolean getPipelining() {
179+
return pipelining;
180+
}
181+
182+
public void setPipelining(Boolean pipelining) {
183+
this.pipelining = pipelining;
184+
}
185+
176186
public Integer getMaxConnections() {
177187
if (maxConnections == null) {
178188
maxConnections = getDefaultMaxConnections();

core/src/main/java/com/arangodb/internal/config/ArangoConfigPropertiesImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,11 @@ public Optional<Integer> getChunkSize() {
119119
return Optional.ofNullable(getProperty(KEY_CHUNK_SIZE)).map(Integer::valueOf);
120120
}
121121

122+
@Override
123+
public Optional<Boolean> getPipelining() {
124+
return Optional.ofNullable(getProperty(KEY_PIPELINING)).map(Boolean::valueOf);
125+
}
126+
122127
@Override
123128
public Optional<Integer> getMaxConnections() {
124129
return Optional.ofNullable(getProperty(KEY_MAX_CONNECTIONS)).map(Integer::valueOf);

core/src/main/java/com/arangodb/internal/net/Communication.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ public CompletableFuture<InternalResponse> executeAsync(final InternalRequest re
5050

5151
private CompletableFuture<InternalResponse> executeAsync(final InternalRequest request, final HostHandle hostHandle, final Host host, final int attemptCount) {
5252
long reqId = reqCount.getAndIncrement();
53-
return doExecuteAsync(request, hostHandle, host, attemptCount, host.connection(), reqId);
53+
return host.connection().thenCompose(c ->
54+
doExecuteAsync(request, hostHandle, host, attemptCount, c, reqId)
55+
.whenComplete((r, t) -> c.release()));
5456
}
5557

5658
private CompletableFuture<InternalResponse> doExecuteAsync(

core/src/main/java/com/arangodb/internal/net/Connection.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,6 @@ public interface Connection extends Closeable {
3535
void setJwt(String jwt);
3636

3737
CompletableFuture<InternalResponse> executeAsync(InternalRequest request);
38+
39+
void release();
3840
}

core/src/main/java/com/arangodb/internal/net/ConnectionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,5 @@
2929
*/
3030
@UsedInApi
3131
public interface ConnectionFactory {
32-
Connection create(ArangoConfig config, HostDescription host);
32+
Connection create(ArangoConfig config, HostDescription host, ConnectionPool pool);
3333
}

core/src/main/java/com/arangodb/internal/net/ConnectionPool.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,18 +20,22 @@
2020

2121
package com.arangodb.internal.net;
2222

23-
import com.arangodb.config.HostDescription;
23+
import com.arangodb.arch.UsedInApi;
2424

2525
import java.io.Closeable;
26+
import java.util.concurrent.CompletableFuture;
2627

2728
/**
2829
* @author Mark Vollmary
2930
*/
31+
@UsedInApi
3032
public interface ConnectionPool extends Closeable {
3133

32-
Connection createConnection(final HostDescription host);
34+
Connection createConnection();
3335

34-
Connection connection();
36+
CompletableFuture<Connection> connection();
37+
38+
void release(final Connection connection);
3539

3640
void setJwt(String jwt);
3741

core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java

Lines changed: 35 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -23,64 +23,77 @@
2323
import com.arangodb.ArangoDBException;
2424
import com.arangodb.config.HostDescription;
2525
import com.arangodb.internal.config.ArangoConfig;
26+
import com.arangodb.internal.util.AsyncQueue;
2627

2728
import java.io.IOException;
28-
import java.util.ArrayList;
2929
import java.util.List;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.CopyOnWriteArrayList;
3032

31-
/**
32-
* @author Mark Vollmary
33-
*/
3433
public class ConnectionPoolImpl implements ConnectionPool {
3534

35+
public static final int HTTP1_SLOTS = 1; // HTTP/1: max 1 pending request
36+
public static final int HTTP1_SLOTS_PIPELINING = 10; // HTTP/1: max pipelining
37+
public static final int HTTP2_SLOTS = 32; // HTTP/2: max streams, hard-coded see BTS-2049
38+
39+
private final AsyncQueue<Connection> slots = new AsyncQueue<>();
3640
private final HostDescription host;
3741
private final ArangoConfig config;
3842
private final int maxConnections;
3943
private final List<Connection> connections;
4044
private final ConnectionFactory factory;
41-
private int current;
45+
private final int maxSlots;
4246
private volatile String jwt = null;
43-
private boolean closed = false;
47+
private volatile boolean closed = false;
4448

4549
public ConnectionPoolImpl(final HostDescription host, final ArangoConfig config, final ConnectionFactory factory) {
4650
super();
4751
this.host = host;
4852
this.config = config;
4953
this.maxConnections = config.getMaxConnections();
5054
this.factory = factory;
51-
connections = new ArrayList<>();
52-
current = 0;
55+
connections = new CopyOnWriteArrayList<>();
56+
switch (config.getProtocol()) {
57+
case HTTP_JSON:
58+
case HTTP_VPACK:
59+
maxSlots = config.getPipelining() ? HTTP1_SLOTS_PIPELINING : HTTP1_SLOTS;
60+
break;
61+
default:
62+
maxSlots = HTTP2_SLOTS;
63+
}
5364
}
5465

5566
@Override
56-
public Connection createConnection(final HostDescription host) {
57-
Connection c = factory.create(config, host);
67+
public Connection createConnection() {
68+
Connection c = factory.create(config, host, this);
5869
c.setJwt(jwt);
5970
return c;
6071
}
6172

6273
@Override
63-
public synchronized Connection connection() {
74+
public CompletableFuture<Connection> connection() {
6475
if (closed) {
6576
throw new ArangoDBException("Connection pool already closed!");
6677
}
6778

68-
final Connection connection;
69-
7079
if (connections.size() < maxConnections) {
71-
connection = createConnection(host);
80+
Connection connection = createConnection();
7281
connections.add(connection);
73-
current++;
74-
} else {
75-
final int index = Math.floorMod(current++, connections.size());
76-
connection = connections.get(index);
82+
for (int i = 0; i < maxSlots; i++) {
83+
slots.offer((connection));
84+
}
7785
}
7886

79-
return connection;
87+
return slots.poll();
88+
}
89+
90+
@Override
91+
public void release(Connection connection) {
92+
slots.offer(connection);
8093
}
8194

8295
@Override
83-
public synchronized void setJwt(String jwt) {
96+
public void setJwt(String jwt) {
8497
if (jwt != null) {
8598
this.jwt = jwt;
8699
for (Connection connection : connections) {
@@ -90,18 +103,17 @@ public synchronized void setJwt(String jwt) {
90103
}
91104

92105
@Override
93-
public synchronized void close() throws IOException {
106+
public void close() throws IOException {
94107
closed = true;
95108
for (final Connection connection : connections) {
96109
connection.close();
97110
}
98-
connections.clear();
99111
}
100112

101113
@Override
102114
public String toString() {
103115
return "ConnectionPoolImpl [host=" + host + ", maxConnections=" + maxConnections + ", connections="
104-
+ connections.size() + ", current=" + current + ", factory=" + factory.getClass().getSimpleName() + "]";
116+
+ connections.size() + ", factory=" + factory.getClass().getSimpleName() + "]";
105117
}
106118

107119
}

core/src/main/java/com/arangodb/internal/net/Host.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.arangodb.config.HostDescription;
2525

2626
import java.io.IOException;
27+
import java.util.concurrent.CompletableFuture;
2728

2829
/**
2930
* @author Mark Vollmary
@@ -33,9 +34,7 @@ public interface Host {
3334

3435
HostDescription getDescription();
3536

36-
Connection connection();
37-
38-
void closeOnError();
37+
CompletableFuture<Connection> connection();
3938

4039
void close() throws IOException;
4140

@@ -44,5 +43,4 @@ public interface Host {
4443
void setMarkforDeletion(boolean markforDeletion);
4544

4645
void setJwt(String jwt);
47-
4846
}

core/src/main/java/com/arangodb/internal/net/HostImpl.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020

2121
package com.arangodb.internal.net;
2222

23-
import com.arangodb.ArangoDBException;
2423
import com.arangodb.config.HostDescription;
2524

2625
import java.io.IOException;
26+
import java.util.concurrent.CompletableFuture;
2727

2828
/**
2929
* @author Mark Vollmary
@@ -51,19 +51,10 @@ public HostDescription getDescription() {
5151
}
5252

5353
@Override
54-
public Connection connection() {
54+
public CompletableFuture<Connection> connection() {
5555
return connectionPool.connection();
5656
}
5757

58-
@Override
59-
public void closeOnError() {
60-
try {
61-
connectionPool.close();
62-
} catch (final IOException e) {
63-
throw ArangoDBException.of(e);
64-
}
65-
}
66-
6758
@Override
6859
public String toString() {
6960
return "HostImpl [connectionPool=" + connectionPool + ", description=" + description + ", markforDeletion="
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package com.arangodb.internal.util;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import java.util.ArrayDeque;
7+
import java.util.Queue;
8+
import java.util.concurrent.*;
9+
10+
public class AsyncQueue<T> {
11+
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncQueue.class);
12+
private final Queue<CompletableFuture<T>> requests = new ConcurrentLinkedQueue<>();
13+
private final Queue<T> offers = new ArrayDeque<>();
14+
15+
public synchronized CompletableFuture<T> poll() {
16+
LOGGER.trace("poll()");
17+
T o = offers.poll();
18+
if (o != null) {
19+
LOGGER.trace("poll(): short-circuit: {}", o);
20+
return CompletableFuture.completedFuture(o);
21+
}
22+
CompletableFuture<T> r = new CompletableFuture<>();
23+
LOGGER.trace("poll(): enqueue request: {}", r);
24+
requests.add(r);
25+
return r;
26+
}
27+
28+
public void offer(T o) {
29+
LOGGER.trace("offer({})", o);
30+
CompletableFuture<T> r = requests.poll();
31+
if (r == null) {
32+
synchronized (this) {
33+
r = requests.poll();
34+
if (r == null) {
35+
LOGGER.trace("offer({}): enqueue", o);
36+
offers.add(o);
37+
}
38+
}
39+
}
40+
if (r != null) {
41+
LOGGER.trace("offer({}): short-circuit: {}", o, r);
42+
r.complete(o);
43+
}
44+
}
45+
}

docker/start_db.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ docker run -d \
6666
--starter.address="${GW}" \
6767
--docker.image="${DOCKER_IMAGE}" \
6868
--starter.local --starter.mode=${STARTER_MODE} --all.log.level=debug --all.log.output=+ --log.verbose \
69-
--all.server.descriptors-minimum=1024 --all.javascript.allow-admin-execute=true
69+
--all.server.descriptors-minimum=1024 --all.javascript.allow-admin-execute=true --all.server.maximal-threads=128
7070

7171

7272
wait_server() {

driver/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<relativePath>../release-parent</relativePath>
99
<groupId>com.arangodb</groupId>
1010
<artifactId>release-parent</artifactId>
11-
<version>7.18.0</version>
11+
<version>7.19.0-SNAPSHOT</version>
1212
</parent>
1313

1414
<name>arangodb-java-driver</name>

http-protocol/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
<relativePath>../release-parent</relativePath>
99
<groupId>com.arangodb</groupId>
1010
<artifactId>release-parent</artifactId>
11-
<version>7.18.0</version>
11+
<version>7.19.0-SNAPSHOT</version>
1212
</parent>
1313

1414
<name>http-protocol</name>

0 commit comments

Comments
 (0)