Skip to content

Active Failover Asynchronous driver #381

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Apr 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ jobs:
topology:
- single
- cluster
- active-failover

steps:
- uses: actions/checkout@v1
Expand Down
2 changes: 2 additions & 0 deletions ChangeLog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/) a

## [Unreleased]

- fixed active failover behavior for the asynchronous driver

## [6.10.0] - 2021-03-27

- closing VST connection after 3 consecutive keepAlive failures (#ES-837)
Expand Down
7 changes: 7 additions & 0 deletions docker/clean_active-failover.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/bin/bash

for c in server1 \
server2 \
server3; do
docker rm -f $c
done
70 changes: 70 additions & 0 deletions docker/start_db_active-failover.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
#!/bin/bash

# USAGE:
# export ARANGO_LICENSE_KEY=<arangodb-enterprise-license>
# ./start_active-failover.sh <dockerImage>

# EXAMPLE:
# ./start_active-failover.sh docker.io/arangodb/arangodb:3.7.10

docker pull "$1"

LOCATION=$(pwd)/$(dirname "$0")

docker network create arangodb --subnet 172.28.0.0/16

echo "Averysecretword" >"$LOCATION"/jwtSecret
docker run --rm -v "$LOCATION"/jwtSecret:/jwtSecret "$1" arangodb auth header --auth.jwt-secret /jwtSecret >"$LOCATION"/jwtHeader
AUTHORIZATION_HEADER=$(cat "$LOCATION"/jwtHeader)

echo "Starting containers..."

docker run -d -v "$LOCATION"/jwtSecret:/jwtSecret -e ARANGO_LICENSE_KEY="$ARANGO_LICENSE_KEY" --network arangodb --ip 172.28.3.1 --name server1 "$1" sh -c 'arangodb --starter.address=$(hostname -i) --starter.mode=activefailover --starter.join server1,server2,server3 --auth.jwt-secret /jwtSecret'
docker run -d -v "$LOCATION"/jwtSecret:/jwtSecret -e ARANGO_LICENSE_KEY="$ARANGO_LICENSE_KEY" --network arangodb --ip 172.28.3.2 --name server2 "$1" sh -c 'arangodb --starter.address=$(hostname -i) --starter.mode=activefailover --starter.join server1,server2,server3 --auth.jwt-secret /jwtSecret'
docker run -d -v "$LOCATION"/jwtSecret:/jwtSecret -e ARANGO_LICENSE_KEY="$ARANGO_LICENSE_KEY" --network arangodb --ip 172.28.3.3 --name server3 "$1" sh -c 'arangodb --starter.address=$(hostname -i) --starter.mode=activefailover --starter.join server1,server2,server3 --auth.jwt-secret /jwtSecret'

debug_container() {
running=$(docker inspect -f '{{.State.Running}}' "$1")

if [ "$running" = false ]; then
echo "$1 is not running!"
echo "---"
docker logs "$1"
echo "---"
exit 1
fi
}

debug() {
for c in server1 \
server2 \
server3; do
debug_container $c
done
}

wait_server() {
# shellcheck disable=SC2091
until $(curl --output /dev/null --silent --head --fail -i -H "$AUTHORIZATION_HEADER" "http://$1/_api/version"); do
printf '.'
debug
sleep 1
done
}

echo "Waiting..."

# Wait for agents:
for a in 172.28.3.1:8529 \
172.28.3.2:8529 \
172.28.3.3:8529; do
wait_server $a
done

docker exec server1 arangosh --server.authentication=false --javascript.execute-string='require("org/arangodb/users").update("root", "test")'
docker exec server2 arangosh --server.authentication=false --javascript.execute-string='require("org/arangodb/users").update("root", "test")'
docker exec server3 arangosh --server.authentication=false --javascript.execute-string='require("org/arangodb/users").update("root", "test")'

#rm "$LOCATION"/jwtHeader "$LOCATION"/jwtSecret

echo "Done, your cluster is ready."
16 changes: 16 additions & 0 deletions docker/start_db_active-failover_retry_fail.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#!/bin/bash

# USAGE:
# export ARANGO_LICENSE_KEY=<arangodb-enterprise-license>
# ./docker/start_db_active-failover_retry_fail.sh <dockerImage>

# EXAMPLE:
# ./docker/start_db_active-failover_retry_fail.sh docker.io/arangodb/arangodb:3.7.10

./docker/start_db_active-failover.sh "$1"
while [ $? -ne 0 ]; do
echo "=== === ==="
echo "active-failover startup failed, retrying ..."
./docker/clean_active-failover.sh
./docker/start_db_active-failover.sh "$1"
done
3 changes: 3 additions & 0 deletions src/main/java/com/arangodb/ArangoDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ public Builder keepAliveInterval(final Integer keepAliveInterval) {
/**
* Whether or not the driver should acquire a list of available coordinators in an ArangoDB cluster or a single
* server with active failover.
* In case of Active-Failover deployment set to {@code true} to enable automatic master discovery.
*
* <p>
* The host list will be used for failover and load balancing.
Expand All @@ -312,6 +313,8 @@ public Builder acquireHostListInterval(final Integer acquireHostListInterval) {

/**
* Sets the load balancing strategy to be used in an ArangoDB cluster setup.
* In case of Active-Failover deployment set to {@link LoadBalancingStrategy#NONE} or not set at all, since that
* would be the default.
*
* @param loadBalancingStrategy the load balancing strategy to be used (default: {@link LoadBalancingStrategy#NONE}
* @return {@link ArangoDB.Builder}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/arangodb/async/ArangoDBAsync.java
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ public Builder keepAliveInterval(final Integer keepAliveInterval) {
/**
* Whether or not the driver should acquire a list of available coordinators in an ArangoDB cluster or a single
* server with active failover.
* In case of Active-Failover deployment set to {@code true} to enable automatic master discovery.
*
* <p>
* The host list will be used for failover and load balancing.
Expand All @@ -457,6 +458,8 @@ public Builder acquireHostList(final Boolean acquireHostList) {

/**
* Sets the load balancing strategy to be used in an ArangoDB cluster setup.
* In case of Active-Failover deployment set to {@link LoadBalancingStrategy#NONE} or not set at all, since that
* would be the default.
*
* @param loadBalancingStrategy the load balancing strategy to be used (default: {@link LoadBalancingStrategy#NONE}
* @return {@link ArangoDBAsync.Builder}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,11 @@
package com.arangodb.async.internal.velocystream;

import com.arangodb.ArangoDBException;
import com.arangodb.entity.ErrorEntity;
import com.arangodb.internal.net.ArangoDBRedirectException;
import com.arangodb.internal.net.HostDescription;
import com.arangodb.internal.net.HostHandle;
import com.arangodb.internal.net.HostHandler;
import com.arangodb.internal.util.HostUtils;
import com.arangodb.internal.velocystream.VstCommunication;
import com.arangodb.internal.velocystream.internal.AuthenticationRequest;
import com.arangodb.internal.velocystream.internal.Message;
Expand Down Expand Up @@ -58,23 +61,37 @@ protected CompletableFuture<Response> execute(final Request request, final VstCo
final Message message = createMessage(request);
send(message, connection).whenComplete((m, ex) -> {
if (m != null) {
final Response response;
try {
final Response response = createResponse(m);
if (response.getResponseCode() >= 300) {
if (response.getBody() != null) {
final ErrorEntity errorEntity = util.deserialize(response.getBody(), ErrorEntity.class);
rfuture.completeExceptionally(new ArangoDBException(errorEntity));
} else {
rfuture.completeExceptionally(new ArangoDBException(
String.format("Response Code: %s", response.getResponseCode()), response.getResponseCode()));
}
} else {
rfuture.complete(response);
}
response = createResponse(m);
} catch (final VPackParserException e) {
LOGGER.error(e.getMessage(), e);
rfuture.completeExceptionally(e);
return;
}

try {
checkError(response);
} catch (final ArangoDBRedirectException e) {
final String location = e.getLocation();
final HostDescription redirectHost = HostUtils.createFromLocation(location);
hostHandler.closeCurrentOnError();
hostHandler.fail();
execute(request, new HostHandle().setHost(redirectHost))
.whenComplete((v, err) -> {
if (v != null) {
rfuture.complete(v);
} else if (err != null) {
rfuture.completeExceptionally(err);
} else {
rfuture.cancel(true);
}
});
return;
} catch (ArangoDBException e) {
rfuture.completeExceptionally(e);
}
rfuture.complete(response);
} else if (ex != null) {
LOGGER.error(ex.getMessage(), ex);
rfuture.completeExceptionally(ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public synchronized Connection connection() {
}

@Override
public void close() throws IOException {
public synchronized void close() throws IOException {
for (final Connection connection : connections) {
connection.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,9 @@
import com.arangodb.ArangoDBException;
import com.arangodb.internal.ArangoDefaults;
import com.arangodb.internal.net.AccessType;
import com.arangodb.internal.net.ArangoDBRedirectException;
import com.arangodb.internal.net.Host;
import com.arangodb.internal.net.HostDescription;
import com.arangodb.internal.net.HostHandle;
import com.arangodb.internal.net.HostHandler;
import com.arangodb.internal.util.HostUtils;
import com.arangodb.internal.util.RequestUtils;
import com.arangodb.internal.util.ResponseUtils;
import com.arangodb.internal.velocystream.internal.Chunk;
Expand Down Expand Up @@ -64,7 +61,7 @@ public abstract class VstCommunication<R, C extends VstConnection> implements Cl
protected final String password;

protected final Integer chunksize;
private final HostHandler hostHandler;
protected final HostHandler hostHandler;

protected VstCommunication(final Integer timeout, final String user, final String password, final Boolean useSsl,
final SSLContext sslContext, final ArangoSerialization util, final Integer chunksize,
Expand Down Expand Up @@ -134,20 +131,8 @@ public void close() throws IOException {
}

public R execute(final Request request, final HostHandle hostHandle) throws ArangoDBException {
try {
final C connection = connect(hostHandle, RequestUtils.determineAccessType(request));
return execute(request, connection);
} catch (final ArangoDBException e) {
if (e instanceof ArangoDBRedirectException) {
final String location = ((ArangoDBRedirectException) e).getLocation();
final HostDescription redirectHost = HostUtils.createFromLocation(location);
hostHandler.closeCurrentOnError();
hostHandler.fail();
return execute(request, new HostHandle().setHost(redirectHost));
} else {
throw e;
}
}
final C connection = connect(hostHandle, RequestUtils.determineAccessType(request));
return execute(request, connection);
}

protected abstract R execute(final Request request, C connection) throws ArangoDBException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
package com.arangodb.internal.velocystream;

import com.arangodb.ArangoDBException;
import com.arangodb.internal.net.ArangoDBRedirectException;
import com.arangodb.internal.net.HostDescription;
import com.arangodb.internal.net.HostHandle;
import com.arangodb.internal.net.HostHandler;
import com.arangodb.internal.util.HostUtils;
import com.arangodb.internal.velocystream.internal.AuthenticationRequest;
import com.arangodb.internal.velocystream.internal.Message;
import com.arangodb.internal.velocystream.internal.VstConnectionSync;
Expand Down Expand Up @@ -127,6 +131,12 @@ protected Response execute(final Request request, final VstConnectionSync connec
return response;
} catch (final VPackParserException e) {
throw new ArangoDBException(e);
} catch (final ArangoDBRedirectException e) {
final String location = e.getLocation();
final HostDescription redirectHost = HostUtils.createFromLocation(location);
hostHandler.closeCurrentOnError();
hostHandler.fail();
return execute(request, new HostHandle().setHost(redirectHost));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void cancel(final long messageId) {
}
}

public void clear(final Exception e) {
public synchronized void clear(final Exception e) {
if (!task.isEmpty()) {
LOGGER.error(e.getMessage(), e);
}
Expand All @@ -96,7 +96,7 @@ public void clear(final Exception e) {
task.clear();
}

public void clear() {
public synchronized void clear() {
for (final Entry<Long, FutureTask<Message>> entry : task.entrySet()) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("Cancel Message (id=%s).", entry.getKey()));
Expand Down
Loading