Skip to content

Commit 8902892

Browse files
authored
[DE-82] Feature/overload-control (#419)
* CircularFifoQueue * CircularFifoQueue.getAvg() * created QueueTimeMetrics interfaces * queue time metrics tests * queue time metrics config * faster queue time metrics tests * fixed gh actions artifact upload * fixed gh actions artifact upload * fixed gh actions artifact upload * "X-Arango-Queue-Time-Seconds" header set in requests
1 parent 609c113 commit 8902892

22 files changed

+548
-77
lines changed

.github/workflows/maven.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ jobs:
8383
if: ${{ cancelled() || failure() }}
8484
uses: actions/upload-artifact@master
8585
with:
86-
name: logs.tgz
86+
name: logs-${{github.job}}.tgz
8787
path: ./logs.tgz
8888

8989
# test encodeURIComponent() and normalize('NFC') comparing to Javascript behavior

src/main/java/com/arangodb/ArangoDB.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,17 @@ public Builder loadBalancingStrategy(final LoadBalancingStrategy loadBalancingSt
326326
return this;
327327
}
328328

329+
/**
330+
* Setting the amount of samples kept for queue time metrics
331+
*
332+
* @param responseQueueTimeSamples amount of samples to keep
333+
* @return {@link ArangoDB.Builder}
334+
*/
335+
public Builder responseQueueTimeSamples(final Integer responseQueueTimeSamples) {
336+
setResponseQueueTimeSamples(responseQueueTimeSamples);
337+
return this;
338+
}
339+
329340
/**
330341
* Register a custom {@link VPackSerializer} for a specific type to be used within the internal serialization
331342
* process.
@@ -658,7 +669,8 @@ public synchronized ArangoDB build() {
658669
protocol,
659670
hostResolver,
660671
hostHandler,
661-
new ArangoContext());
672+
new ArangoContext(),
673+
responseQueueTimeSamples, timeout);
662674
}
663675

664676
}
@@ -705,6 +717,11 @@ default ArangoDatabase db(String name) {
705717
*/
706718
ArangoDatabase db(DbName dbName);
707719

720+
/**
721+
* @return entry point for accessing client metrics
722+
*/
723+
ArangoMetrics metrics();
724+
708725
/**
709726
* Creates a new database with the given name.
710727
*
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* DISCLAIMER
3+
*
4+
* Copyright 2016 ArangoDB GmbH, Cologne, Germany
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
* Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
*/
20+
21+
package com.arangodb;
22+
23+
/**
24+
* Interface for accessing metrics.
25+
*
26+
* @author Michele Rastelli
27+
* @since ArangoDB 3.9
28+
*/
29+
public interface ArangoMetrics {
30+
/**
31+
* @return queue time metrics
32+
*/
33+
QueueTimeMetrics getQueueTime();
34+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* DISCLAIMER
3+
*
4+
* Copyright 2016 ArangoDB GmbH, Cologne, Germany
5+
*
6+
* Licensed under the Apache License, Version 2.0 (the "License");
7+
* you may not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*
18+
* Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
*/
20+
21+
package com.arangodb;
22+
23+
import com.arangodb.model.QueueTimeSample;
24+
25+
/**
26+
* Interface for accessing queue time latency metrics, reported by the "X-Arango-Queue-Time-Seconds" response header.
27+
* This header contains the most recent request (de)queuing time (in seconds) as tracked by the server’s scheduler.
28+
*
29+
* @author Michele Rastelli
30+
* @see <a href="https://www.arangodb.com/docs/stable/http/general.html#overload-control">API Documentation</a>
31+
* @since ArangoDB 3.9
32+
*/
33+
public interface QueueTimeMetrics {
34+
35+
/**
36+
* @return all the n values observed
37+
*/
38+
QueueTimeSample[] getValues();
39+
40+
/**
41+
* @return the average of the last n values observed, 0.0 if no value has been observed (i.e. in ArangoDB versions
42+
* prior to 3.9).
43+
*/
44+
double getAvg();
45+
}

src/main/java/com/arangodb/async/ArangoDBAsync.java

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,7 @@
2020

2121
package com.arangodb.async;
2222

23-
import com.arangodb.ArangoDBException;
24-
import com.arangodb.ArangoSerializationAccessor;
25-
import com.arangodb.DbName;
26-
import com.arangodb.Protocol;
23+
import com.arangodb.*;
2724
import com.arangodb.async.internal.ArangoDBAsyncImpl;
2825
import com.arangodb.async.internal.velocystream.VstCommunicationAsync;
2926
import com.arangodb.async.internal.velocystream.VstConnectionFactoryAsync;
@@ -110,6 +107,11 @@ default ArangoDatabaseAsync db(final String name) {
110107
*/
111108
ArangoDatabaseAsync db(final DbName dbName);
112109

110+
/**
111+
* @return entry point for accessing client metrics
112+
*/
113+
ArangoMetrics metrics();
114+
113115
/**
114116
* Creates a new database
115117
*
@@ -497,6 +499,17 @@ public Builder acquireHostList(final Boolean acquireHostList) {
497499
return this;
498500
}
499501

502+
/**
503+
* Setting the amount of samples kept for queue time metrics
504+
*
505+
* @param responseQueueTimeSamples amount of samples to keep
506+
* @return {@link ArangoDBAsync.Builder}
507+
*/
508+
public Builder responseQueueTimeSamples(final Integer responseQueueTimeSamples) {
509+
setResponseQueueTimeSamples(responseQueueTimeSamples);
510+
return this;
511+
}
512+
500513
/**
501514
* Sets the load balancing strategy to be used in an ArangoDB cluster setup.
502515
* In case of Active-Failover deployment set to {@link LoadBalancingStrategy#NONE} or not set at all, since that
@@ -838,7 +851,9 @@ public synchronized ArangoDBAsync build() {
838851
syncHostResolver,
839852
asyncHostHandler,
840853
syncHostHandler,
841-
new ArangoContext());
854+
new ArangoContext(),
855+
responseQueueTimeSamples,
856+
timeout);
842857
}
843858

844859
private VstCommunicationAsync.Builder asyncBuilder(final HostHandler hostHandler) {

src/main/java/com/arangodb/async/internal/ArangoDBAsyncImpl.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
package com.arangodb.async.internal;
2222

2323
import com.arangodb.ArangoDBException;
24+
import com.arangodb.ArangoMetrics;
2425
import com.arangodb.DbName;
2526
import com.arangodb.async.ArangoDBAsync;
2627
import com.arangodb.async.ArangoDatabaseAsync;
@@ -68,18 +69,22 @@ public ArangoDBAsyncImpl(
6869
final HostResolver syncHostResolver,
6970
final HostHandler asyncHostHandler,
7071
final HostHandler syncHostHandler,
71-
final ArangoContext context
72+
final ArangoContext context,
73+
final int responseQueueTimeSamples,
74+
final int timeoutMs
7275
) {
7376

74-
super(new ArangoExecutorAsync(asyncCommBuilder.build(util.get(Serializer.INTERNAL)), util, new DocumentCache()), util, context);
77+
super(new ArangoExecutorAsync(asyncCommBuilder.build(util.get(Serializer.INTERNAL)), util, new DocumentCache(),
78+
new QueueTimeMetricsImpl(responseQueueTimeSamples), timeoutMs), util, context);
7579

7680
final VstCommunication<Response, VstConnectionSync> cacheCom = syncCommBuilder.build(util.get(Serializer.INTERNAL));
7781

7882
cp = new VstProtocol(cacheCom);
7983
this.asyncHostHandler = asyncHostHandler;
8084
this.syncHostHandler = syncHostHandler;
8185

82-
ArangoExecutorSync arangoExecutorSync = new ArangoExecutorSync(cp, util, new DocumentCache());
86+
ArangoExecutorSync arangoExecutorSync = new ArangoExecutorSync(cp, util, new DocumentCache(),
87+
new QueueTimeMetricsImpl(responseQueueTimeSamples), timeoutMs);
8388
asyncHostResolver.init(arangoExecutorSync, util.get(Serializer.INTERNAL));
8489
syncHostResolver.init(arangoExecutorSync, util.get(Serializer.INTERNAL));
8590

@@ -121,6 +126,11 @@ public ArangoDatabaseAsync db(final DbName name) {
121126
return new ArangoDatabaseAsyncImpl(this, name);
122127
}
123128

129+
@Override
130+
public ArangoMetrics metrics() {
131+
return new ArangoMetricsImpl(executor.getQueueTimeMetrics());
132+
}
133+
124134
@Override
125135
public CompletableFuture<Boolean> createDatabase(final DbName name) {
126136
return createDatabase(new DBCreateOptions().name(name));

src/main/java/com/arangodb/async/internal/ArangoExecutorAsync.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.arangodb.async.internal.velocystream.VstCommunicationAsync;
2525
import com.arangodb.internal.ArangoExecutor;
2626
import com.arangodb.internal.DocumentCache;
27+
import com.arangodb.internal.QueueTimeMetricsImpl;
2728
import com.arangodb.internal.net.HostHandle;
2829
import com.arangodb.internal.util.ArangoSerializationFactory;
2930
import com.arangodb.velocystream.Request;
@@ -44,8 +45,8 @@ public class ArangoExecutorAsync extends ArangoExecutor {
4445
private final ExecutorService outgoingExecutor = Executors.newSingleThreadExecutor();
4546

4647
public ArangoExecutorAsync(final VstCommunicationAsync communication, final ArangoSerializationFactory util,
47-
final DocumentCache documentCache) {
48-
super(util, documentCache);
48+
final DocumentCache documentCache, final QueueTimeMetricsImpl qtMetrics, final int timeoutMs) {
49+
super(util, documentCache, qtMetrics, timeoutMs);
4950
this.communication = communication;
5051
}
5152

@@ -67,8 +68,11 @@ private <T> CompletableFuture<T> execute(
6768
final HostHandle hostHandle) {
6869

6970
return CompletableFuture.completedFuture(null)
70-
.thenComposeAsync((it) -> communication.execute(request, hostHandle), outgoingExecutor)
71-
.thenApplyAsync(responseDeserializer::deserialize);
71+
.thenComposeAsync((it) -> communication.execute(interceptRequest(request), hostHandle), outgoingExecutor)
72+
.thenApplyAsync(response -> {
73+
interceptResponse(response);
74+
return responseDeserializer.deserialize(response);
75+
});
7276
}
7377

7478
public void disconnect() {

src/main/java/com/arangodb/entity/CursorEntity.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ public Map<String, String> getMeta() {
9797
public Map<String, String> cleanupMeta(Map<String, String> meta) {
9898
meta.remove("Content-Length");
9999
meta.remove("Transfer-Encoding");
100+
meta.remove("X-Arango-Queue-Time-Seconds");
100101
return meta;
101102
}
102103

src/main/java/com/arangodb/internal/ArangoDBImpl.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,12 +61,12 @@ public class ArangoDBImpl extends InternalArangoDB<ArangoExecutorSync> implement
6161

6262
public ArangoDBImpl(final VstCommunicationSync.Builder vstBuilder, final HttpCommunication.Builder httpBuilder,
6363
final ArangoSerializationFactory util, final Protocol protocol, final HostResolver hostResolver,
64-
final HostHandler hostHandler, final ArangoContext context) {
64+
final HostHandler hostHandler, final ArangoContext context, int responseQueueTimeSamples, final int timeoutMs) {
6565

6666
super(new ArangoExecutorSync(
6767
createProtocol(vstBuilder, httpBuilder, util.get(Serializer.INTERNAL), protocol),
6868
util,
69-
new DocumentCache()),
69+
new DocumentCache(), new QueueTimeMetricsImpl(responseQueueTimeSamples), timeoutMs),
7070
util,
7171
context);
7272

@@ -140,6 +140,11 @@ public ArangoDatabase db(final DbName dbName) {
140140
return new ArangoDatabaseImpl(this, dbName).setCursorInitializer(cursorInitializer);
141141
}
142142

143+
@Override
144+
public ArangoMetrics metrics() {
145+
return new ArangoMetricsImpl(executor.getQueueTimeMetrics());
146+
}
147+
143148
@Override
144149
public Boolean createDatabase(final DbName dbName) throws ArangoDBException {
145150
return createDatabase(new DBCreateOptions().name(dbName));

src/main/java/com/arangodb/internal/ArangoDefaults.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,6 @@ private ArangoDefaults() {
5050
public static final boolean DEFAULT_ACQUIRE_HOST_LIST = false;
5151
public static final int DEFAULT_ACQUIRE_HOST_LIST_INTERVAL = 60 * 60 * 1000; // hour
5252
public static final LoadBalancingStrategy DEFAULT_LOAD_BALANCING_STRATEGY = LoadBalancingStrategy.NONE;
53+
public static final int DEFAULT_RESPONSE_QUEUE_TIME_SAMPLES = 10;
5354

5455
}

src/main/java/com/arangodb/internal/ArangoErrors.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,6 @@ private ArangoErrors() {
3232
public static final Integer ERROR_ARANGO_DATA_SOURCE_NOT_FOUND = 1203;
3333
public static final Integer ERROR_ARANGO_DATABASE_NOT_FOUND = 1228;
3434
public static final Integer ERROR_GRAPH_NOT_FOUND = 1924;
35+
public static final Integer QUEUE_TIME_VIOLATED = 21004;
3536

3637
}

src/main/java/com/arangodb/internal/ArangoExecutor.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020

2121
package com.arangodb.internal;
2222

23+
import com.arangodb.QueueTimeMetrics;
2324
import com.arangodb.entity.Entity;
2425
import com.arangodb.internal.util.ArangoSerializationFactory;
2526
import com.arangodb.internal.util.ArangoSerializationFactory.Serializer;
2627
import com.arangodb.velocypack.exception.VPackException;
28+
import com.arangodb.velocystream.Request;
2729
import com.arangodb.velocystream.Response;
2830

2931
import java.lang.reflect.ParameterizedType;
@@ -69,12 +71,17 @@ private boolean isInternal(final Type type) {
6971
}
7072

7173
private final DocumentCache documentCache;
74+
private final QueueTimeMetricsImpl qtMetrics;
7275
private final ArangoSerializationFactory util;
76+
private final String timeoutS;
7377

74-
protected ArangoExecutor(final ArangoSerializationFactory util, final DocumentCache documentCache) {
78+
protected ArangoExecutor(final ArangoSerializationFactory util, final DocumentCache documentCache,
79+
final QueueTimeMetricsImpl qtMetrics, final int timeoutMs) {
7580
super();
7681
this.documentCache = documentCache;
82+
this.qtMetrics = qtMetrics;
7783
this.util = util;
84+
timeoutS = timeoutMs >= 1000 ? Integer.toString(timeoutMs / 1000) : null;
7885
}
7986

8087
public DocumentCache documentCache() {
@@ -85,4 +92,19 @@ public interface ResponseDeserializer<T> {
8592
T deserialize(Response response) throws VPackException;
8693
}
8794

95+
protected final void interceptResponse(Response response) {
96+
String queueTime = response.getMeta().get("X-Arango-Queue-Time-Seconds");
97+
if (queueTime != null) {
98+
qtMetrics.add(Double.parseDouble(queueTime));
99+
}
100+
}
101+
102+
protected final Request interceptRequest(Request request) {
103+
request.putHeaderParam("X-Arango-Queue-Time-Seconds", timeoutS);
104+
return request;
105+
}
106+
107+
public QueueTimeMetrics getQueueTimeMetrics() {
108+
return qtMetrics;
109+
}
88110
}

src/main/java/com/arangodb/internal/ArangoExecutorSync.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,8 @@ public class ArangoExecutorSync extends ArangoExecutor {
4444
private final CommunicationProtocol protocol;
4545

4646
public ArangoExecutorSync(final CommunicationProtocol protocol, final ArangoSerializationFactory util,
47-
final DocumentCache documentCache) {
48-
super(util, documentCache);
47+
final DocumentCache documentCache, final QueueTimeMetricsImpl qtMetrics, final int timeoutMs) {
48+
super(util, documentCache, qtMetrics, timeoutMs);
4949
this.protocol = protocol;
5050
}
5151

@@ -68,7 +68,8 @@ public <T> T execute(
6868

6969
try {
7070

71-
final Response response = protocol.execute(request, hostHandle);
71+
final Response response = protocol.execute(interceptRequest(request), hostHandle);
72+
interceptResponse(response);
7273
T deserialize = responseDeserializer.deserialize(response);
7374

7475
if (deserialize instanceof MetaAware) {

0 commit comments

Comments
 (0)