Skip to content

Commit 0096781

Browse files
committed
async connection pool
1 parent 147a632 commit 0096781

File tree

12 files changed

+115
-28
lines changed

12 files changed

+115
-28
lines changed

core/src/main/java/com/arangodb/internal/net/Communication.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ public CompletableFuture<InternalResponse> executeAsync(final InternalRequest re
5050

5151
private CompletableFuture<InternalResponse> executeAsync(final InternalRequest request, final HostHandle hostHandle, final Host host, final int attemptCount) {
5252
long reqId = reqCount.getAndIncrement();
53-
return doExecuteAsync(request, hostHandle, host, attemptCount, host.connection(), reqId);
53+
return host.connection().thenCompose(c ->
54+
doExecuteAsync(request, hostHandle, host, attemptCount, c, reqId)
55+
.whenComplete((r, t) -> host.release(c)));
5456
}
5557

5658
private CompletableFuture<InternalResponse> doExecuteAsync(

core/src/main/java/com/arangodb/internal/net/ConnectionPool.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.arangodb.config.HostDescription;
2424

2525
import java.io.Closeable;
26+
import java.util.concurrent.CompletableFuture;
2627

2728
/**
2829
* @author Mark Vollmary
@@ -31,7 +32,9 @@ public interface ConnectionPool extends Closeable {
3132

3233
Connection createConnection(final HostDescription host);
3334

34-
Connection connection();
35+
CompletableFuture<Connection> connection();
36+
37+
void release(final Connection connection);
3538

3639
void setJwt(String jwt);
3740

core/src/main/java/com/arangodb/internal/net/ConnectionPoolImpl.java

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,28 @@
2323
import com.arangodb.ArangoDBException;
2424
import com.arangodb.config.HostDescription;
2525
import com.arangodb.internal.config.ArangoConfig;
26+
import com.arangodb.internal.util.AsyncQueue;
2627

2728
import java.io.IOException;
2829
import java.util.ArrayList;
2930
import java.util.List;
31+
import java.util.concurrent.CompletableFuture;
3032

3133
/**
3234
* @author Mark Vollmary
3335
*/
3436
public class ConnectionPoolImpl implements ConnectionPool {
3537

38+
public static final int HTTP1_PIPELINING_LIMIT = 10;
39+
public static final int HTTP2_STREAMS = 32; // hard-coded, see BTS-2049
40+
41+
private final AsyncQueue<Connection> slots = new AsyncQueue<>();
3642
private final HostDescription host;
3743
private final ArangoConfig config;
3844
private final int maxConnections;
3945
private final List<Connection> connections;
4046
private final ConnectionFactory factory;
41-
private int current;
47+
private final int maxSlots;
4248
private volatile String jwt = null;
4349
private boolean closed = false;
4450

@@ -49,7 +55,14 @@ public ConnectionPoolImpl(final HostDescription host, final ArangoConfig config,
4955
this.maxConnections = config.getMaxConnections();
5056
this.factory = factory;
5157
connections = new ArrayList<>();
52-
current = 0;
58+
switch (config.getProtocol()) {
59+
case HTTP_JSON:
60+
case HTTP_VPACK:
61+
maxSlots = config.getPipelining() ? HTTP1_PIPELINING_LIMIT : 1;
62+
break;
63+
default:
64+
maxSlots = HTTP2_STREAMS;
65+
}
5366
}
5467

5568
@Override
@@ -60,23 +73,25 @@ public Connection createConnection(final HostDescription host) {
6073
}
6174

6275
@Override
63-
public synchronized Connection connection() {
76+
public synchronized CompletableFuture<Connection> connection() {
6477
if (closed) {
6578
throw new ArangoDBException("Connection pool already closed!");
6679
}
6780

68-
final Connection connection;
69-
7081
if (connections.size() < maxConnections) {
71-
connection = createConnection(host);
82+
Connection connection = createConnection(host);
7283
connections.add(connection);
73-
current++;
74-
} else {
75-
final int index = Math.floorMod(current++, connections.size());
76-
connection = connections.get(index);
84+
for (int i = 0; i < maxSlots; i++) {
85+
slots.offer((connection));
86+
}
7787
}
7888

79-
return connection;
89+
return slots.poll();
90+
}
91+
92+
@Override
93+
public void release(Connection connection) {
94+
slots.offer(connection);
8095
}
8196

8297
@Override
@@ -101,7 +116,7 @@ public synchronized void close() throws IOException {
101116
@Override
102117
public String toString() {
103118
return "ConnectionPoolImpl [host=" + host + ", maxConnections=" + maxConnections + ", connections="
104-
+ connections.size() + ", current=" + current + ", factory=" + factory.getClass().getSimpleName() + "]";
119+
+ connections.size() + ", factory=" + factory.getClass().getSimpleName() + "]";
105120
}
106121

107122
}

core/src/main/java/com/arangodb/internal/net/Host.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.arangodb.config.HostDescription;
2525

2626
import java.io.IOException;
27+
import java.util.concurrent.CompletableFuture;
2728

2829
/**
2930
* @author Mark Vollmary
@@ -33,7 +34,9 @@ public interface Host {
3334

3435
HostDescription getDescription();
3536

36-
Connection connection();
37+
CompletableFuture<Connection> connection();
38+
39+
void release(Connection c);
3740

3841
void closeOnError();
3942

@@ -44,5 +47,4 @@ public interface Host {
4447
void setMarkforDeletion(boolean markforDeletion);
4548

4649
void setJwt(String jwt);
47-
4850
}

core/src/main/java/com/arangodb/internal/net/HostImpl.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.arangodb.config.HostDescription;
2525

2626
import java.io.IOException;
27+
import java.util.concurrent.CompletableFuture;
2728

2829
/**
2930
* @author Mark Vollmary
@@ -51,10 +52,15 @@ public HostDescription getDescription() {
5152
}
5253

5354
@Override
54-
public Connection connection() {
55+
public CompletableFuture<Connection> connection() {
5556
return connectionPool.connection();
5657
}
5758

59+
@Override
60+
public void release(Connection c) {
61+
connectionPool.release(c);
62+
}
63+
5864
@Override
5965
public void closeOnError() {
6066
try {
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.arangodb.internal.util;
2+
3+
import java.util.ArrayDeque;
4+
import java.util.Queue;
5+
import java.util.concurrent.CompletableFuture;
6+
7+
public class AsyncQueue<T> {
8+
private final Queue<CompletableFuture<T>> requests = new ArrayDeque<>();
9+
private final Queue<T> offers = new ArrayDeque<>();
10+
11+
public synchronized CompletableFuture<T> poll() {
12+
CompletableFuture<T> r = new CompletableFuture<>();
13+
T o = offers.poll();
14+
if (o != null) {
15+
r.complete(o);
16+
} else {
17+
requests.add(r);
18+
}
19+
return r;
20+
}
21+
22+
public synchronized void offer(T o) {
23+
CompletableFuture<T> r = requests.poll();
24+
if (r != null) {
25+
r.complete(o);
26+
} else {
27+
offers.add(o);
28+
}
29+
}
30+
}

http-protocol/src/main/java/com/arangodb/http/HttpConnection.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@
6363
import java.util.concurrent.TimeUnit;
6464
import java.util.concurrent.atomic.AtomicInteger;
6565

66+
import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP1_PIPELINING_LIMIT;
67+
import static com.arangodb.internal.net.ConnectionPoolImpl.HTTP2_STREAMS;
68+
6669

6770
/**
6871
* @author Mark Vollmary
@@ -88,7 +91,6 @@ private static String getUserAgent() {
8891
}
8992

9093
HttpConnection(final ArangoConfig config, final HostDescription host, final HttpProtocolConfig protocolConfig) {
91-
super();
9294
Protocol protocol = config.getProtocol();
9395
ContentType contentType = ContentTypeFactory.of(protocol);
9496
if (contentType == ContentType.VPACK) {
@@ -148,7 +150,9 @@ private static String getUserAgent() {
148150
.setLogActivity(true)
149151
.setKeepAlive(true)
150152
.setTcpKeepAlive(true)
151-
.setPipelining(true)
153+
.setPipelining(config.getPipelining())
154+
.setPipeliningLimit(HTTP1_PIPELINING_LIMIT)
155+
.setHttp2MultiplexingLimit(HTTP2_STREAMS)
152156
.setReuseAddress(true)
153157
.setReusePort(true)
154158
.setHttp2ClearTextUpgrade(false)
@@ -273,7 +277,7 @@ public CompletableFuture<InternalResponse> executeAsync(@UnstableApi final Inter
273277
return rfuture;
274278
}
275279

276-
public void doExecute(@UnstableApi final InternalRequest request, @UnstableApi final CompletableFuture<InternalResponse> rfuture) {
280+
private void doExecute(@UnstableApi final InternalRequest request, @UnstableApi final CompletableFuture<InternalResponse> rfuture) {
277281
String path = buildUrl(request);
278282
HttpRequest<Buffer> httpRequest = client
279283
.request(requestTypeToHttpMethod(request.getRequestType()), path)

test-functional/src/test/java/com/arangodb/internal/HostHandlerTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import java.util.Collections;
3030
import java.util.List;
31+
import java.util.concurrent.CompletableFuture;
3132

3233
import static org.assertj.core.api.Assertions.assertThat;
3334
import static org.junit.jupiter.api.Assertions.fail;
@@ -44,10 +45,15 @@ public Connection createConnection(HostDescription host) {
4445
}
4546

4647
@Override
47-
public Connection connection() {
48+
public CompletableFuture<Connection> connection() {
4849
return null;
4950
}
5051

52+
@Override
53+
public void release(Connection connection) {
54+
55+
}
56+
5157
@Override
5258
public void setJwt(String jwt) {
5359

test-functional/src/test/resources/simplelogger.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,5 @@ org.slf4j.simpleLogger.defaultLogLevel=info
1010
#org.slf4j.simpleLogger.log.com.arangodb.internal.serde.JacksonUtils=debug
1111
#org.slf4j.simpleLogger.log.com.arangodb.internal.net.Communication=debug
1212
#org.slf4j.simpleLogger.log.com.arangodb.internal.serde.InternalSerdeImpl=debug
13+
#org.slf4j.simpleLogger.log.io.netty.handler.logging.LoggingHandler=debug
14+
#org.slf4j.simpleLogger.log.io.netty.handler.codec.http2.Http2FrameLogger=debug

test-non-functional/src/test/java/concurrency/ConnectionLoadBalanceTest.java

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22

33
import com.arangodb.*;
44
import com.arangodb.config.ArangoConfigProperties;
5+
import com.arangodb.internal.net.ConnectionPoolImpl;
56
import org.junit.jupiter.params.ParameterizedTest;
67
import org.junit.jupiter.params.provider.Arguments;
78
import org.junit.jupiter.params.provider.MethodSource;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
811
import util.TestUtils;
912

1013
import java.time.Duration;
@@ -16,12 +19,13 @@
1619
import static org.awaitility.Awaitility.await;
1720

1821
public class ConnectionLoadBalanceTest {
22+
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionLoadBalanceTest.class);
1923

2024
public static Stream<Arguments> configs() {
2125
return Stream.of(
22-
// FIXME: DE-1017
23-
// new Config(Protocol.VST, 1),
24-
// new Config(Protocol.VST, 2),
26+
// FIXME: DE-1017
27+
// new Config(Protocol.VST, 1),
28+
// new Config(Protocol.VST, 2),
2529
new Config(Protocol.HTTP_JSON, 10),
2630
new Config(Protocol.HTTP_JSON, 20),
2731
new Config(Protocol.HTTP2_JSON, 1),
@@ -32,7 +36,7 @@ public static Stream<Arguments> configs() {
3236
// Test the requests load balancing across different connections, when all the slots except 1 are busy
3337
@MethodSource("configs")
3438
@ParameterizedTest
35-
void loadBalanceToFreeConnection(Config cfg) throws InterruptedException {
39+
void loadBalanceToAvailableSlots(Config cfg) throws InterruptedException {
3640
doTestLoadBalance(cfg, 1);
3741
}
3842

@@ -66,20 +70,30 @@ void doTestLoadBalance(Config cfg, int sleepCycles) throws InterruptedException
6670

6771
CompletableFuture<Void> shortRunningTasks = CompletableFuture.allOf(
6872
IntStream.range(0, shortTasksCount)
69-
.mapToObj(__ -> db.query("RETURN 1", Integer.class))
73+
.mapToObj(__ -> db.getVersion())
7074
.toArray(CompletableFuture[]::new)
7175
);
7276

77+
LOGGER.debug("awaiting...");
78+
7379
await()
7480
.timeout(Duration.ofSeconds(sleepDuration * sleepCycles - 1L))
7581
.until(shortRunningTasks::isDone);
7682

83+
LOGGER.debug("completed shortRunningTasks");
84+
85+
// join exceptional completions
86+
shortRunningTasks.join();
87+
7788
await()
7889
.timeout(Duration.ofSeconds(sleepDuration * sleepCycles + 1L))
7990
.until(longRunningTasks::isDone);
8091

81-
shortRunningTasks.join();
92+
LOGGER.debug("completed longRunningTasks");
93+
94+
// join exceptional completions
8295
longRunningTasks.join();
96+
8397
db.arango().shutdown();
8498
}
8599

@@ -90,7 +104,7 @@ private record Config(
90104
int maxStreams() {
91105
return switch (protocol) {
92106
case HTTP_JSON, HTTP_VPACK -> 1;
93-
default -> 32;
107+
default -> ConnectionPoolImpl.HTTP2_STREAMS;
94108
};
95109
}
96110
}

test-non-functional/src/test/resources/arangodb-config-test.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ adb.timeout=9876
77
adb.useSsl=true
88
adb.verifyHost=false
99
adb.chunkSize=1234
10+
adb.pipelining=true
1011
adb.maxConnections=123
1112
adb.connectionTtl=12345
1213
adb.keepAliveInterval=123456

test-non-functional/src/test/resources/simplelogger.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,5 @@ org.slf4j.simpleLogger.showShortLogName=false
99
org.slf4j.simpleLogger.defaultLogLevel=info
1010
#org.slf4j.simpleLogger.log.com.arangodb.internal.serde.JacksonUtils=debug
1111
#org.slf4j.simpleLogger.log.com.arangodb.internal.net.Communication=debug
12+
#org.slf4j.simpleLogger.log.io.netty.handler.logging.LoggingHandler=debug
13+
#org.slf4j.simpleLogger.log.io.netty.handler.codec.http2.Http2FrameLogger=debug

0 commit comments

Comments
 (0)