Skip to content

Support core metrics for async non-streaming and streaming operations #1889

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
"$L" +
".withResponseHandler($L)\n" +
".withErrorResponseHandler(errorResponseHandler)\n" +
".withMetricCollector(apiCallMetricCollector)\n" +
hostPrefixExpression(opModel) +
discoveredEndpoint(opModel) +
asyncRequestBody +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ public CodeBlock asyncExecutionHandler(IntermediateModel intermediateModel, Oper
".withMarshaller($L)" +
".withResponseHandler(responseHandler)" +
".withErrorResponseHandler($N)\n" +
".withMetricCollector(apiCallMetricCollector)\n" +
hostPrefixExpression(opModel) +
asyncRequestBody +
".withInput($L) $L);",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public CodeBlock executionHandler(OperationModel opModel) {
.add("\n\nreturn clientHandler.execute(new $T<$T, $T>()" +
".withOperationName(\"$N\")\n" +
".withCombinedResponseHandler($N)" +
".withMetricCollector(apiCallMetricCollector)\n" +
hostPrefixExpression(opModel) +
discoveredEndpoint(opModel) +
".withInput($L)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ public CompletableFuture<APostOperationResponse> aPostOperation(APostOperationRe
.withOperationName("APostOperation")
.withMarshaller(new APostOperationRequestMarshaller(protocolFactory))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.hostPrefixExpression(resolvedHostExpression).withInput(aPostOperationRequest));
.withMetricCollector(apiCallMetricCollector).hostPrefixExpression(resolvedHostExpression)
.withInput(aPostOperationRequest));
AwsRequestOverrideConfiguration requestOverrideConfig = aPostOperationRequest.overrideConfiguration().orElse(null);
executeFuture.whenComplete((r, e) -> {
Optional<MetricPublisher> metricPublisher = MetricUtils.resolvePublisher(clientConfiguration,
Expand Down Expand Up @@ -237,7 +238,7 @@ public CompletableFuture<APostOperationWithOutputResponse> aPostOperationWithOut
.withOperationName("APostOperationWithOutput")
.withMarshaller(new APostOperationWithOutputRequestMarshaller(protocolFactory))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.withInput(aPostOperationWithOutputRequest));
.withMetricCollector(apiCallMetricCollector).withInput(aPostOperationWithOutputRequest));
AwsRequestOverrideConfiguration requestOverrideConfig = aPostOperationWithOutputRequest.overrideConfiguration()
.orElse(null);
executeFuture.whenComplete((r, e) -> {
Expand Down Expand Up @@ -320,8 +321,8 @@ public CompletableFuture<Void> eventStreamOperation(EventStreamOperationRequest
.withMarshaller(new EventStreamOperationRequestMarshaller(protocolFactory))
.withAsyncRequestBody(software.amazon.awssdk.core.async.AsyncRequestBody.fromPublisher(adapted))
.withFullDuplex(true).withResponseHandler(responseHandler)
.withErrorResponseHandler(errorResponseHandler).withInput(eventStreamOperationRequest),
restAsyncResponseTransformer);
.withErrorResponseHandler(errorResponseHandler).withMetricCollector(apiCallMetricCollector)
.withInput(eventStreamOperationRequest), restAsyncResponseTransformer);
AwsRequestOverrideConfiguration requestOverrideConfig = eventStreamOperationRequest.overrideConfiguration().orElse(
null);
executeFuture.whenComplete((r, e) -> {
Expand Down Expand Up @@ -398,7 +399,7 @@ public CompletableFuture<EventStreamOperationWithOnlyInputResponse> eventStreamO
.withMarshaller(new EventStreamOperationWithOnlyInputRequestMarshaller(protocolFactory))
.withAsyncRequestBody(software.amazon.awssdk.core.async.AsyncRequestBody.fromPublisher(adapted))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.withInput(eventStreamOperationWithOnlyInputRequest));
.withMetricCollector(apiCallMetricCollector).withInput(eventStreamOperationWithOnlyInputRequest));
AwsRequestOverrideConfiguration requestOverrideConfig = eventStreamOperationWithOnlyInputRequest
.overrideConfiguration().orElse(null);
executeFuture.whenComplete((r, e) -> {
Expand Down Expand Up @@ -477,6 +478,7 @@ public CompletableFuture<Void> eventStreamOperationWithOnlyOutput(
.withOperationName("EventStreamOperationWithOnlyOutput")
.withMarshaller(new EventStreamOperationWithOnlyOutputRequestMarshaller(protocolFactory))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.withMetricCollector(apiCallMetricCollector)
.withInput(eventStreamOperationWithOnlyOutputRequest), restAsyncResponseTransformer);
AwsRequestOverrideConfiguration requestOverrideConfig = eventStreamOperationWithOnlyOutputRequest
.overrideConfiguration().orElse(null);
Expand Down Expand Up @@ -547,7 +549,7 @@ public CompletableFuture<GetWithoutRequiredMembersResponse> getWithoutRequiredMe
.withOperationName("GetWithoutRequiredMembers")
.withMarshaller(new GetWithoutRequiredMembersRequestMarshaller(protocolFactory))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.withInput(getWithoutRequiredMembersRequest));
.withMetricCollector(apiCallMetricCollector).withInput(getWithoutRequiredMembersRequest));
AwsRequestOverrideConfiguration requestOverrideConfig = getWithoutRequiredMembersRequest.overrideConfiguration()
.orElse(null);
executeFuture.whenComplete((r, e) -> {
Expand Down Expand Up @@ -605,7 +607,7 @@ public CompletableFuture<PaginatedOperationWithResultKeyResponse> paginatedOpera
.withOperationName("PaginatedOperationWithResultKey")
.withMarshaller(new PaginatedOperationWithResultKeyRequestMarshaller(protocolFactory))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.withInput(paginatedOperationWithResultKeyRequest));
.withMetricCollector(apiCallMetricCollector).withInput(paginatedOperationWithResultKeyRequest));
AwsRequestOverrideConfiguration requestOverrideConfig = paginatedOperationWithResultKeyRequest
.overrideConfiguration().orElse(null);
executeFuture.whenComplete((r, e) -> {
Expand Down Expand Up @@ -740,7 +742,7 @@ public CompletableFuture<PaginatedOperationWithoutResultKeyResponse> paginatedOp
.withOperationName("PaginatedOperationWithoutResultKey")
.withMarshaller(new PaginatedOperationWithoutResultKeyRequestMarshaller(protocolFactory))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.withInput(paginatedOperationWithoutResultKeyRequest));
.withMetricCollector(apiCallMetricCollector).withInput(paginatedOperationWithoutResultKeyRequest));
AwsRequestOverrideConfiguration requestOverrideConfig = paginatedOperationWithoutResultKeyRequest
.overrideConfiguration().orElse(null);
executeFuture.whenComplete((r, e) -> {
Expand Down Expand Up @@ -882,8 +884,8 @@ public CompletableFuture<StreamingInputOperationResponse> streamingInputOperatio
AsyncStreamingRequestMarshaller.builder()
.delegateMarshaller(new StreamingInputOperationRequestMarshaller(protocolFactory))
.asyncRequestBody(requestBody).build()).withResponseHandler(responseHandler)
.withErrorResponseHandler(errorResponseHandler).withAsyncRequestBody(requestBody)
.withInput(streamingInputOperationRequest));
.withErrorResponseHandler(errorResponseHandler).withMetricCollector(apiCallMetricCollector)
.withAsyncRequestBody(requestBody).withInput(streamingInputOperationRequest));
AwsRequestOverrideConfiguration requestOverrideConfig = streamingInputOperationRequest.overrideConfiguration()
.orElse(null);
executeFuture.whenComplete((r, e) -> {
Expand Down Expand Up @@ -958,8 +960,8 @@ public <ReturnT> CompletableFuture<ReturnT> streamingInputOutputOperation(
new StreamingInputOutputOperationRequestMarshaller(protocolFactory))
.asyncRequestBody(requestBody).transferEncoding(true).build())
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.withAsyncRequestBody(requestBody).withInput(streamingInputOutputOperationRequest),
asyncResponseTransformer);
.withMetricCollector(apiCallMetricCollector).withAsyncRequestBody(requestBody)
.withInput(streamingInputOutputOperationRequest), asyncResponseTransformer);
AwsRequestOverrideConfiguration requestOverrideConfig = streamingInputOutputOperationRequest.overrideConfiguration()
.orElse(null);
executeFuture.whenComplete((r, e) -> {
Expand Down Expand Up @@ -1028,7 +1030,8 @@ public <ReturnT> CompletableFuture<ReturnT> streamingOutputOperation(
.withOperationName("StreamingOutputOperation")
.withMarshaller(new StreamingOutputOperationRequestMarshaller(protocolFactory))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.withInput(streamingOutputOperationRequest), asyncResponseTransformer);
.withMetricCollector(apiCallMetricCollector).withInput(streamingOutputOperationRequest),
asyncResponseTransformer);
AwsRequestOverrideConfiguration requestOverrideConfig = streamingOutputOperationRequest.overrideConfiguration()
.orElse(null);
executeFuture.whenComplete((r, e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public CompletableFuture<DescribeEndpointsResponse> describeEndpoints(DescribeEn
.withOperationName("DescribeEndpoints")
.withMarshaller(new DescribeEndpointsRequestMarshaller(protocolFactory))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.withInput(describeEndpointsRequest));
.withMetricCollector(apiCallMetricCollector).withInput(describeEndpointsRequest));
AwsRequestOverrideConfiguration requestOverrideConfig = describeEndpointsRequest.overrideConfiguration().orElse(null);
executeFuture.whenComplete((r, e) -> {
Optional<MetricPublisher> metricPublisher = MetricUtils.resolvePublisher(clientConfiguration,
Expand Down Expand Up @@ -176,7 +176,8 @@ public CompletableFuture<TestDiscoveryIdentifiersRequiredResponse> testDiscovery
.withOperationName("TestDiscoveryIdentifiersRequired")
.withMarshaller(new TestDiscoveryIdentifiersRequiredRequestMarshaller(protocolFactory))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.discoveredEndpoint(cachedEndpoint).withInput(testDiscoveryIdentifiersRequiredRequest));
.withMetricCollector(apiCallMetricCollector).discoveredEndpoint(cachedEndpoint)
.withInput(testDiscoveryIdentifiersRequiredRequest));
AwsRequestOverrideConfiguration requestOverrideConfig = testDiscoveryIdentifiersRequiredRequest
.overrideConfiguration().orElse(null);
executeFuture.whenComplete((r, e) -> {
Expand Down Expand Up @@ -239,7 +240,8 @@ public CompletableFuture<TestDiscoveryOptionalResponse> testDiscoveryOptional(
.withOperationName("TestDiscoveryOptional")
.withMarshaller(new TestDiscoveryOptionalRequestMarshaller(protocolFactory))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.discoveredEndpoint(cachedEndpoint).withInput(testDiscoveryOptionalRequest));
.withMetricCollector(apiCallMetricCollector).discoveredEndpoint(cachedEndpoint)
.withInput(testDiscoveryOptionalRequest));
AwsRequestOverrideConfiguration requestOverrideConfig = testDiscoveryOptionalRequest.overrideConfiguration().orElse(
null);
executeFuture.whenComplete((r, e) -> {
Expand Down Expand Up @@ -302,7 +304,8 @@ public CompletableFuture<TestDiscoveryRequiredResponse> testDiscoveryRequired(
.withOperationName("TestDiscoveryRequired")
.withMarshaller(new TestDiscoveryRequiredRequestMarshaller(protocolFactory))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.discoveredEndpoint(cachedEndpoint).withInput(testDiscoveryRequiredRequest));
.withMetricCollector(apiCallMetricCollector).discoveredEndpoint(cachedEndpoint)
.withInput(testDiscoveryRequiredRequest));
AwsRequestOverrideConfiguration requestOverrideConfig = testDiscoveryRequiredRequest.overrideConfiguration().orElse(
null);
executeFuture.whenComplete((r, e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public CompletableFuture<APostOperationResponse> aPostOperation(APostOperationRe
.withOperationName("APostOperation")
.withMarshaller(new APostOperationRequestMarshaller(protocolFactory))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.hostPrefixExpression(resolvedHostExpression).withInput(aPostOperationRequest));
.withMetricCollector(apiCallMetricCollector).hostPrefixExpression(resolvedHostExpression)
.withInput(aPostOperationRequest));
AwsRequestOverrideConfiguration requestOverrideConfig = aPostOperationRequest.overrideConfiguration().orElse(null);
executeFuture.whenComplete((r, e) -> {
Optional<MetricPublisher> metricPublisher = MetricUtils.resolvePublisher(clientConfiguration,
Expand Down Expand Up @@ -166,7 +167,7 @@ public CompletableFuture<APostOperationWithOutputResponse> aPostOperationWithOut
.withOperationName("APostOperationWithOutput")
.withMarshaller(new APostOperationWithOutputRequestMarshaller(protocolFactory))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.withInput(aPostOperationWithOutputRequest));
.withMetricCollector(apiCallMetricCollector).withInput(aPostOperationWithOutputRequest));
AwsRequestOverrideConfiguration requestOverrideConfig = aPostOperationWithOutputRequest.overrideConfiguration()
.orElse(null);
executeFuture.whenComplete((r, e) -> {
Expand Down Expand Up @@ -227,8 +228,8 @@ public CompletableFuture<StreamingInputOperationResponse> streamingInputOperatio
AsyncStreamingRequestMarshaller.builder()
.delegateMarshaller(new StreamingInputOperationRequestMarshaller(protocolFactory))
.asyncRequestBody(requestBody).build()).withResponseHandler(responseHandler)
.withErrorResponseHandler(errorResponseHandler).withAsyncRequestBody(requestBody)
.withInput(streamingInputOperationRequest));
.withErrorResponseHandler(errorResponseHandler).withMetricCollector(apiCallMetricCollector)
.withAsyncRequestBody(requestBody).withInput(streamingInputOperationRequest));
AwsRequestOverrideConfiguration requestOverrideConfig = streamingInputOperationRequest.overrideConfiguration()
.orElse(null);
executeFuture.whenComplete((r, e) -> {
Expand Down Expand Up @@ -288,7 +289,8 @@ public <ReturnT> CompletableFuture<ReturnT> streamingOutputOperation(
.withOperationName("StreamingOutputOperation")
.withMarshaller(new StreamingOutputOperationRequestMarshaller(protocolFactory))
.withResponseHandler(responseHandler).withErrorResponseHandler(errorResponseHandler)
.withInput(streamingOutputOperationRequest), asyncResponseTransformer);
.withMetricCollector(apiCallMetricCollector).withInput(streamingOutputOperationRequest),
asyncResponseTransformer);
AwsRequestOverrideConfiguration requestOverrideConfig = streamingOutputOperationRequest.overrideConfiguration()
.orElse(null);
executeFuture.whenComplete((r, e) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
package software.amazon.awssdk.metrics;

import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.utils.Logger;

/**
* A metric collector that doesn't do anything.
*/
@SdkPublicApi
public final class NoOpMetricCollector implements MetricCollector {
private static final Logger log = Logger.loggerFor(NoOpMetricCollector.class);
private static final NoOpMetricCollector INSTANCE = new NoOpMetricCollector();

private NoOpMetricCollector() {
Expand All @@ -34,6 +36,7 @@ public String name() {

@Override
public <T> void reportMetric(SdkMetric<T> metric, T data) {
log.trace(() -> "Metrics reported: " + data);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public final class DefaultMetricCollection implements MetricCollection {


public DefaultMetricCollection(String name, Map<SdkMetric<?>,
List<MetricRecord<?>>> metrics,
List<MetricRecord<?>>> metrics,
List<MetricCollection> children) {
this.name = name;
this.metrics = metrics;
Expand Down Expand Up @@ -68,8 +68,8 @@ public List<MetricCollection> children() {
@Override
public Iterator<MetricRecord<?>> iterator() {
return metrics.values().stream()
.flatMap(List::stream)
.iterator();
.flatMap(List::stream)
.iterator();
}

@Override
Expand Down
Loading