Skip to content

Add "ConcurrencyAcquireDuration" metric for apache-client #2912

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-ApacheHTTPClient-2ecc813.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"category": "Apache HTTP Client",
"contributor": "",
"type": "feature",
"description": "Add \"ConcurrencyAcquireDuration\" metric for apache-client"
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,13 @@ public final class HttpMetric {
* The time taken to acquire a channel from the connection pool.
*
* <p>For HTTP/1 operations, a channel is equivalent to a TCP connection. For HTTP/2 operations, a channel is equivalent to
* an HTTP/2 stream channel. For both protocols, the time to acquire a new concurrency permit may include the following:
* an HTTP/2 stream channel. For both protocols, the time to acquire a new channel may include the following:
* <ol>
* <li>Awaiting a concurrency permit, as restricted by the client's max concurrency configuration.</li>
* <li>The time to establish a new connection, depending on whether an existing connection is available in the pool or
* not.</li>
* <li>The time taken to perform a TLS handshake/negotiation, if TLS is enabled.</li>
* </ol>
*
* <p>Note: This metric is currently only supported in 'netty-nio-client'.
*/
public static final SdkMetric<Duration> CONCURRENCY_ACQUIRE_DURATION =
metric("ConcurrencyAcquireDuration", Duration.class, MetricLevel.INFO);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static software.amazon.awssdk.http.HttpMetric.LEASED_CONCURRENCY;
import static software.amazon.awssdk.http.HttpMetric.MAX_CONCURRENCY;
import static software.amazon.awssdk.http.HttpMetric.PENDING_CONCURRENCY_ACQUIRES;
import static software.amazon.awssdk.http.apache.internal.conn.ClientConnectionRequestFactory.THREAD_LOCAL_REQUEST_METRIC_COLLECTOR;
import static software.amazon.awssdk.utils.NumericUtils.saturatedCast;

import java.io.IOException;
Expand Down Expand Up @@ -230,7 +231,7 @@ public ExecutableHttpRequest prepareRequest(HttpExecuteRequest request) {
return new ExecutableHttpRequest() {
@Override
public HttpExecuteResponse call() throws IOException {
HttpExecuteResponse executeResponse = execute(apacheRequest);
HttpExecuteResponse executeResponse = execute(apacheRequest, metricCollector);
collectPoolMetric(metricCollector);
return executeResponse;
}
Expand All @@ -249,10 +250,15 @@ public void close() {
cm.shutdown();
}

private HttpExecuteResponse execute(HttpRequestBase apacheRequest) throws IOException {
private HttpExecuteResponse execute(HttpRequestBase apacheRequest, MetricCollector metricCollector) throws IOException {
HttpClientContext localRequestContext = ApacheUtils.newClientContext(requestConfig.proxyConfiguration());
HttpResponse httpResponse = httpClient.execute(apacheRequest, localRequestContext);
return createResponse(httpResponse, apacheRequest);
THREAD_LOCAL_REQUEST_METRIC_COLLECTOR.set(metricCollector);
try {
HttpResponse httpResponse = httpClient.execute(apacheRequest, localRequestContext);
return createResponse(httpResponse, apacheRequest);
} finally {
THREAD_LOCAL_REQUEST_METRIC_COLLECTOR.remove();
}
}

private HttpRequestBase toApacheRequest(HttpExecuteRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,17 @@

package software.amazon.awssdk.http.apache.internal.conn;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpClientConnection;
import org.apache.http.conn.ConnectionRequest;
import org.apache.http.conn.HttpClientConnectionManager;
import org.apache.http.pool.ConnPoolControl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.http.conn.routing.HttpRoute;
import org.apache.http.protocol.HttpContext;
import software.amazon.awssdk.annotations.SdkInternalApi;

@SdkInternalApi
public final class ClientConnectionManagerFactory {
private static final Logger log = LoggerFactory.getLogger(ClientConnectionManagerFactory.class);

private ClientConnectionManagerFactory() {
}
Expand All @@ -40,52 +37,78 @@ private ClientConnectionManagerFactory() {
* @param orig the target instance to be wrapped
*/
public static HttpClientConnectionManager wrap(HttpClientConnectionManager orig) {
if (orig instanceof Wrapped) {
if (orig instanceof DelegatingHttpClientConnectionManager) {
throw new IllegalArgumentException();
}
Class<?>[] interfaces;
if (orig instanceof ConnPoolControl) {
interfaces = new Class<?>[]{
HttpClientConnectionManager.class,
ConnPoolControl.class,
Wrapped.class};
} else {
interfaces = new Class<?>[]{
HttpClientConnectionManager.class,
Wrapped.class
};
return new InstrumentedHttpClientConnectionManager(orig);
}

/**
* Further wraps {@link ConnectionRequest} to capture performance metrics.
*/
private static class InstrumentedHttpClientConnectionManager extends DelegatingHttpClientConnectionManager {

private InstrumentedHttpClientConnectionManager(HttpClientConnectionManager delegate) {
super(delegate);
}

@Override
public ConnectionRequest requestConnection(HttpRoute route, Object state) {
ConnectionRequest connectionRequest = super.requestConnection(route, state);
return ClientConnectionRequestFactory.wrap(connectionRequest);
}
return (HttpClientConnectionManager) Proxy.newProxyInstance(
// https://github.com/aws/aws-sdk-java/pull/48#issuecomment-29454423
ClientConnectionManagerFactory.class.getClassLoader(),
interfaces,
new Handler(orig));
}

/**
* The handler behind the dynamic proxy for {@link HttpClientConnectionManager}
* so that the any returned instance of {@link ConnectionRequest} can
* further wrapped for capturing performance metrics.
* Delegates all methods to {@link HttpClientConnectionManager}. Subclasses can override select methods to change behavior.
*/
private static class Handler implements InvocationHandler {
private final HttpClientConnectionManager orig;
private static class DelegatingHttpClientConnectionManager implements HttpClientConnectionManager {

private final HttpClientConnectionManager delegate;

protected DelegatingHttpClientConnectionManager(HttpClientConnectionManager delegate) {
this.delegate = delegate;
}

@Override
public ConnectionRequest requestConnection(HttpRoute route, Object state) {
return delegate.requestConnection(route, state);
}

@Override
public void releaseConnection(HttpClientConnection conn, Object newState, long validDuration, TimeUnit timeUnit) {
delegate.releaseConnection(conn, newState, validDuration, timeUnit);
}

@Override
public void connect(HttpClientConnection conn, HttpRoute route, int connectTimeout, HttpContext context)
throws IOException {
delegate.connect(conn, route, connectTimeout, context);
}

@Override
public void upgrade(HttpClientConnection conn, HttpRoute route, HttpContext context) throws IOException {
delegate.upgrade(conn, route, context);
}

Handler(HttpClientConnectionManager real) {
this.orig = real;
@Override
public void routeComplete(HttpClientConnection conn, HttpRoute route, HttpContext context) throws IOException {
delegate.routeComplete(conn, route, context);
}

@Override
public void closeIdleConnections(long idletime, TimeUnit timeUnit) {
delegate.closeIdleConnections(idletime, timeUnit);
}

@Override
public void closeExpiredConnections() {
delegate.closeExpiredConnections();
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
try {
Object ret = method.invoke(orig, args);
return ret instanceof ConnectionRequest
? ClientConnectionRequestFactory.wrap((ConnectionRequest) ret)
: ret
;
} catch (InvocationTargetException e) {
log.debug("", e);
throw e.getCause();
}
public void shutdown() {
delegate.shutdown();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,25 @@

package software.amazon.awssdk.http.apache.internal.conn;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.http.HttpClientConnection;
import org.apache.http.conn.ConnectionPoolTimeoutException;
import org.apache.http.conn.ConnectionRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.http.HttpMetric;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.metrics.MetricCollector;

@SdkInternalApi
final class ClientConnectionRequestFactory {
private static final Logger log = LoggerFactory.getLogger(ClientConnectionRequestFactory.class);
private static final Class<?>[] INTERFACES = {
ConnectionRequest.class,
Wrapped.class
};
public final class ClientConnectionRequestFactory {

/**
* {@link ThreadLocal}, request-level {@link MetricCollector}, set and removed by {@link ApacheHttpClient}.
*/
public static final ThreadLocal<MetricCollector> THREAD_LOCAL_REQUEST_METRIC_COLLECTOR = new ThreadLocal<>();

private ClientConnectionRequestFactory() {
}
Expand All @@ -43,48 +46,55 @@ private ClientConnectionRequestFactory() {
* @param orig the target instance to be wrapped
*/
static ConnectionRequest wrap(ConnectionRequest orig) {
if (orig instanceof Wrapped) {
if (orig instanceof DelegatingConnectionRequest) {
throw new IllegalArgumentException();
}
return (ConnectionRequest) Proxy.newProxyInstance(
// https://github.com/aws/aws-sdk-java/pull/48#issuecomment-29454423
ClientConnectionRequestFactory.class.getClassLoader(),
INTERFACES,
new Handler(orig));
return new InstrumentedConnectionRequest(orig);
}

/**
* The handler behind the dynamic proxy for {@link ConnectionRequest}
* so that the latency of the
* {@link ConnectionRequest#get(long, java.util.concurrent.TimeUnit)}
* can be captured.
* Measures the latency of {@link ConnectionRequest#get(long, java.util.concurrent.TimeUnit)}.
*/
private static class Handler implements InvocationHandler {
private final ConnectionRequest orig;
private static class InstrumentedConnectionRequest extends DelegatingConnectionRequest {

Handler(ConnectionRequest orig) {
this.orig = orig;
private InstrumentedConnectionRequest(ConnectionRequest delegate) {
super(delegate);
}

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
public HttpClientConnection get(long timeout, TimeUnit timeUnit) throws InterruptedException, ExecutionException,
ConnectionPoolTimeoutException {
Instant startTime = Instant.now();
try {
// TODO v2 service metrics
// if ("get".equals(method.getName())) {
// ServiceLatencyProvider latencyProvider = new ServiceLatencyProvider(
// AWSServiceMetrics.HttpClientGetConnectionTime);
// try {
// return method.invoke(orig, args);
// } finally {
// AwsSdkMetrics.getServiceMetricCollector()
// .collectLatency(latencyProvider.endTiming());
// }
// }
return method.invoke(orig, args);
} catch (InvocationTargetException e) {
log.debug("", e);
throw e.getCause();
return super.get(timeout, timeUnit);
} finally {
Duration elapsed = Duration.between(startTime, Instant.now());
MetricCollector metricCollector = THREAD_LOCAL_REQUEST_METRIC_COLLECTOR.get();
metricCollector.reportMetric(HttpMetric.CONCURRENCY_ACQUIRE_DURATION, elapsed);
}
}
}

/**
* Delegates all methods to {@link ConnectionRequest}. Subclasses can override select methods to change behavior.
*/
private static class DelegatingConnectionRequest implements ConnectionRequest {

private final ConnectionRequest delegate;

private DelegatingConnectionRequest(ConnectionRequest delegate) {
this.delegate = delegate;
}

@Override
public HttpClientConnection get(long timeout, TimeUnit timeUnit)
throws InterruptedException, ExecutionException, ConnectionPoolTimeoutException {
return delegate.get(timeout, timeUnit);
}

@Override
public boolean cancel() {
return delegate.cancel();
}
}
}
Loading