Skip to content

[DE-1016] Async connection pool #602

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<relativePath>../release-parent</relativePath>
<groupId>com.arangodb</groupId>
<artifactId>release-parent</artifactId>
<version>7.18.0</version>
<version>7.19.0-SNAPSHOT</version>
</parent>

<name>core</name>
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/com/arangodb/ArangoDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,17 @@ public Builder chunkSize(final Integer chunkSize) {
return this;
}

/**
* Set whether to use requests pipelining in HTTP/1.1 ({@link Protocol#HTTP_JSON} or {@link Protocol#HTTP_VPACK}).
*
* @param pipelining {@code true} if enabled
* @return {@link ArangoDB.Builder}
*/
public Builder pipelining(final Boolean pipelining) {
config.setPipelining(pipelining);
return this;
}

/**
* Sets the maximum number of connections the built in connection pool will open per host.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public interface ArangoConfigProperties {
String KEY_USE_SSL = "useSsl";
String KEY_VERIFY_HOST = "verifyHost";
String KEY_CHUNK_SIZE = "chunkSize";
String KEY_PIPELINING = "pipelining";
String KEY_MAX_CONNECTIONS = "maxConnections";
String KEY_CONNECTION_TTL = "connectionTtl";
String KEY_KEEP_ALIVE_INTERVAL = "keepAliveInterval";
Expand Down Expand Up @@ -110,6 +111,10 @@ default Optional<Integer> getChunkSize() {
return Optional.empty();
}

default Optional<Boolean> getPipelining() {
return Optional.empty();
}

default Optional<Integer> getMaxConnections() {
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public final class ArangoDefaults {
public static final Boolean DEFAULT_USE_SSL = false;
public static final Boolean DEFAULT_VERIFY_HOST = true;
public static final Integer DEFAULT_CHUNK_SIZE = 30_000;
public static final Boolean DEFAULT_PIPELINING = false;
public static final Boolean DEFAULT_ACQUIRE_HOST_LIST = false;
public static final Integer DEFAULT_ACQUIRE_HOST_LIST_INTERVAL = 60 * 60 * 1000; // hour
public static final LoadBalancingStrategy DEFAULT_LOAD_BALANCING_STRATEGY = LoadBalancingStrategy.NONE;
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/java/com/arangodb/internal/config/ArangoConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class ArangoConfig {
private SSLContext sslContext;
private Boolean verifyHost;
private Integer chunkSize;
private Boolean pipelining;
private Integer maxConnections;
private Long connectionTtl;
private Integer keepAliveInterval;
Expand Down Expand Up @@ -70,6 +71,7 @@ public void loadProperties(final ArangoConfigProperties properties) {
useSsl = properties.getUseSsl().orElse(ArangoDefaults.DEFAULT_USE_SSL);
verifyHost = properties.getVerifyHost().orElse(ArangoDefaults.DEFAULT_VERIFY_HOST);
chunkSize = properties.getChunkSize().orElse(ArangoDefaults.DEFAULT_CHUNK_SIZE);
pipelining = properties.getPipelining().orElse(ArangoDefaults.DEFAULT_PIPELINING);
// FIXME: make maxConnections field Optional
maxConnections = properties.getMaxConnections().orElse(null);
// FIXME: make connectionTtl field Optional
Expand Down Expand Up @@ -173,6 +175,14 @@ public void setChunkSize(Integer chunkSize) {
this.chunkSize = chunkSize;
}

public Boolean getPipelining() {
return pipelining;
}

public void setPipelining(Boolean pipelining) {
this.pipelining = pipelining;
}

public Integer getMaxConnections() {
if (maxConnections == null) {
maxConnections = getDefaultMaxConnections();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public Optional<Integer> getChunkSize() {
return Optional.ofNullable(getProperty(KEY_CHUNK_SIZE)).map(Integer::valueOf);
}

@Override
public Optional<Boolean> getPipelining() {
return Optional.ofNullable(getProperty(KEY_PIPELINING)).map(Boolean::valueOf);
}

@Override
public Optional<Integer> getMaxConnections() {
return Optional.ofNullable(getProperty(KEY_MAX_CONNECTIONS)).map(Integer::valueOf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ public CompletableFuture<InternalResponse> executeAsync(final InternalRequest re

private CompletableFuture<InternalResponse> executeAsync(final InternalRequest request, final HostHandle hostHandle, final Host host, final int attemptCount) {
long reqId = reqCount.getAndIncrement();
return doExecuteAsync(request, hostHandle, host, attemptCount, host.connection(), reqId);
return host.connection().thenCompose(c ->
doExecuteAsync(request, hostHandle, host, attemptCount, c, reqId)
.whenComplete((r, t) -> c.release()));
}

private CompletableFuture<InternalResponse> doExecuteAsync(
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/com/arangodb/internal/net/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,6 @@ public interface Connection extends Closeable {
void setJwt(String jwt);

CompletableFuture<InternalResponse> executeAsync(InternalRequest request);

void release();
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,5 @@
*/
@UsedInApi
public interface ConnectionFactory {
Connection create(ArangoConfig config, HostDescription host);
Connection create(ArangoConfig config, HostDescription host, ConnectionPool pool);
}
10 changes: 7 additions & 3 deletions core/src/main/java/com/arangodb/internal/net/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,22 @@

package com.arangodb.internal.net;

import com.arangodb.config.HostDescription;
import com.arangodb.arch.UsedInApi;

import java.io.Closeable;
import java.util.concurrent.CompletableFuture;

/**
* @author Mark Vollmary
*/
@UsedInApi
public interface ConnectionPool extends Closeable {

Connection createConnection(final HostDescription host);
Connection createConnection();

Connection connection();
CompletableFuture<Connection> connection();

void release(final Connection connection);

void setJwt(String jwt);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,64 +23,77 @@
import com.arangodb.ArangoDBException;
import com.arangodb.config.HostDescription;
import com.arangodb.internal.config.ArangoConfig;
import com.arangodb.internal.util.AsyncQueue;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* @author Mark Vollmary
*/
public class ConnectionPoolImpl implements ConnectionPool {

public static final int HTTP1_SLOTS = 1; // HTTP/1: max 1 pending request
public static final int HTTP1_SLOTS_PIPELINING = 10; // HTTP/1: max pipelining
public static final int HTTP2_SLOTS = 32; // HTTP/2: max streams, hard-coded see BTS-2049

private final AsyncQueue<Connection> slots = new AsyncQueue<>();
private final HostDescription host;
private final ArangoConfig config;
private final int maxConnections;
private final List<Connection> connections;
private final ConnectionFactory factory;
private int current;
private final int maxSlots;
private volatile String jwt = null;
private boolean closed = false;
private volatile boolean closed = false;

public ConnectionPoolImpl(final HostDescription host, final ArangoConfig config, final ConnectionFactory factory) {
super();
this.host = host;
this.config = config;
this.maxConnections = config.getMaxConnections();
this.factory = factory;
connections = new ArrayList<>();
current = 0;
connections = new CopyOnWriteArrayList<>();
switch (config.getProtocol()) {
case HTTP_JSON:
case HTTP_VPACK:
maxSlots = config.getPipelining() ? HTTP1_SLOTS_PIPELINING : HTTP1_SLOTS;
break;
default:
maxSlots = HTTP2_SLOTS;
}
}

@Override
public Connection createConnection(final HostDescription host) {
Connection c = factory.create(config, host);
public Connection createConnection() {
Connection c = factory.create(config, host, this);
c.setJwt(jwt);
return c;
}

@Override
public synchronized Connection connection() {
public CompletableFuture<Connection> connection() {
if (closed) {
throw new ArangoDBException("Connection pool already closed!");
}

final Connection connection;

if (connections.size() < maxConnections) {
connection = createConnection(host);
Connection connection = createConnection();
connections.add(connection);
current++;
} else {
final int index = Math.floorMod(current++, connections.size());
connection = connections.get(index);
for (int i = 0; i < maxSlots; i++) {
slots.offer((connection));
}
}

return connection;
return slots.poll();
}

@Override
public void release(Connection connection) {
slots.offer(connection);
}

@Override
public synchronized void setJwt(String jwt) {
public void setJwt(String jwt) {
if (jwt != null) {
this.jwt = jwt;
for (Connection connection : connections) {
Expand All @@ -90,18 +103,17 @@ public synchronized void setJwt(String jwt) {
}

@Override
public synchronized void close() throws IOException {
public void close() throws IOException {
closed = true;
for (final Connection connection : connections) {
connection.close();
}
connections.clear();
}

@Override
public String toString() {
return "ConnectionPoolImpl [host=" + host + ", maxConnections=" + maxConnections + ", connections="
+ connections.size() + ", current=" + current + ", factory=" + factory.getClass().getSimpleName() + "]";
+ connections.size() + ", factory=" + factory.getClass().getSimpleName() + "]";
}

}
6 changes: 2 additions & 4 deletions core/src/main/java/com/arangodb/internal/net/Host.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.arangodb.config.HostDescription;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/**
* @author Mark Vollmary
Expand All @@ -33,9 +34,7 @@ public interface Host {

HostDescription getDescription();

Connection connection();

void closeOnError();
CompletableFuture<Connection> connection();

void close() throws IOException;

Expand All @@ -44,5 +43,4 @@ public interface Host {
void setMarkforDeletion(boolean markforDeletion);

void setJwt(String jwt);

}
13 changes: 2 additions & 11 deletions core/src/main/java/com/arangodb/internal/net/HostImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

package com.arangodb.internal.net;

import com.arangodb.ArangoDBException;
import com.arangodb.config.HostDescription;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;

/**
* @author Mark Vollmary
Expand Down Expand Up @@ -51,19 +51,10 @@ public HostDescription getDescription() {
}

@Override
public Connection connection() {
public CompletableFuture<Connection> connection() {
return connectionPool.connection();
}

@Override
public void closeOnError() {
try {
connectionPool.close();
} catch (final IOException e) {
throw ArangoDBException.of(e);
}
}

@Override
public String toString() {
return "HostImpl [connectionPool=" + connectionPool + ", description=" + description + ", markforDeletion="
Expand Down
45 changes: 45 additions & 0 deletions core/src/main/java/com/arangodb/internal/util/AsyncQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.arangodb.internal.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.*;

public class AsyncQueue<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(AsyncQueue.class);
private final Queue<CompletableFuture<T>> requests = new ConcurrentLinkedQueue<>();
private final Queue<T> offers = new ArrayDeque<>();

public synchronized CompletableFuture<T> poll() {
LOGGER.trace("poll()");
T o = offers.poll();
if (o != null) {
LOGGER.trace("poll(): short-circuit: {}", o);
return CompletableFuture.completedFuture(o);
}
CompletableFuture<T> r = new CompletableFuture<>();
LOGGER.trace("poll(): enqueue request: {}", r);
requests.add(r);
return r;
}

public void offer(T o) {
LOGGER.trace("offer({})", o);
CompletableFuture<T> r = requests.poll();
if (r == null) {
synchronized (this) {
r = requests.poll();
if (r == null) {
LOGGER.trace("offer({}): enqueue", o);
offers.add(o);
}
}
}
if (r != null) {
LOGGER.trace("offer({}): short-circuit: {}", o, r);
r.complete(o);
}
}
}
2 changes: 1 addition & 1 deletion docker/start_db.sh
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ docker run -d \
--starter.address="${GW}" \
--docker.image="${DOCKER_IMAGE}" \
--starter.local --starter.mode=${STARTER_MODE} --all.log.level=debug --all.log.output=+ --log.verbose \
--all.server.descriptors-minimum=1024 --all.javascript.allow-admin-execute=true
--all.server.descriptors-minimum=1024 --all.javascript.allow-admin-execute=true --all.server.maximal-threads=128


wait_server() {
Expand Down
2 changes: 1 addition & 1 deletion driver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<relativePath>../release-parent</relativePath>
<groupId>com.arangodb</groupId>
<artifactId>release-parent</artifactId>
<version>7.18.0</version>
<version>7.19.0-SNAPSHOT</version>
</parent>

<name>arangodb-java-driver</name>
Expand Down
2 changes: 1 addition & 1 deletion http-protocol/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<relativePath>../release-parent</relativePath>
<groupId>com.arangodb</groupId>
<artifactId>release-parent</artifactId>
<version>7.18.0</version>
<version>7.19.0-SNAPSHOT</version>
</parent>

<name>http-protocol</name>
Expand Down
Loading