Skip to content

Commit 02bca82

Browse files
committed
Add CloudWatchPublisher
1 parent f7d8996 commit 02bca82

File tree

13 files changed

+922
-18
lines changed

13 files changed

+922
-18
lines changed

core/metrics-spi/src/main/java/software/amazon/awssdk/metrics/meter/Counter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
* @param <T> type of the stored value
2424
*/
2525
@SdkPublicApi
26-
public interface Counter<T> extends Metric, Counting<T> {
26+
public interface Counter<T extends Number> extends Metric, Counting<T> {
2727

2828
/**
2929
* Increment the metric value by 1 unit

core/metrics-spi/src/main/java/software/amazon/awssdk/metrics/meter/Counting.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
* @param <T> type of the count value
2323
*/
2424
@SdkPublicApi
25-
public interface Counting<T> {
25+
public interface Counting<T extends Number> {
2626

2727
/**
2828
* Returns the current count.

core/metrics-spi/src/main/java/software/amazon/awssdk/metrics/meter/NoOpCounter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
package software.amazon.awssdk.metrics.meter;
2-
31
/*
42
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
53
*
@@ -15,6 +13,8 @@
1513
* permissions and limitations under the License.
1614
*/
1715

16+
package software.amazon.awssdk.metrics.meter;
17+
1818
import software.amazon.awssdk.annotations.SdkPublicApi;
1919

2020
/**

core/sdk-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,11 @@
6868
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
6969
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
7070
import software.amazon.awssdk.metrics.provider.DefaultMetricConfigurationProviderChain;
71+
import software.amazon.awssdk.metrics.publisher.MetricPublisher;
7172
import software.amazon.awssdk.metrics.publisher.MetricPublisherConfiguration;
7273
import software.amazon.awssdk.utils.AttributeMap;
7374
import software.amazon.awssdk.utils.Either;
75+
import software.amazon.awssdk.utils.Logger;
7476
import software.amazon.awssdk.utils.ThreadFactoryBuilder;
7577
import software.amazon.awssdk.utils.Validate;
7678

@@ -93,6 +95,9 @@
9395
*/
9496
@SdkProtectedApi
9597
public abstract class SdkDefaultClientBuilder<B extends SdkClientBuilder<B, C>, C> implements SdkClientBuilder<B, C> {
98+
private static final String CLOUDWATCH_PUBLISHER_PATH =
99+
"software.amazon.awssdk.metrics.publishers.cloudwatch.CloudWatchPublisher";
100+
private static final Logger log = Logger.loggerFor(SdkDefaultClientBuilder.class);
96101

97102
private static final SdkHttpClient.Builder DEFAULT_HTTP_CLIENT_BUILDER = new DefaultSdkHttpClientBuilder();
98103
private static final SdkAsyncHttpClient.Builder DEFAULT_ASYNC_HTTP_CLIENT_BUILDER = new DefaultSdkAsyncHttpClientBuilder();
@@ -357,12 +362,18 @@ public final B overrideConfiguration(ClientOverrideConfiguration overrideConfig)
357362
return thisBuilder();
358363
}
359364

360-
// TODO
361-
// Create an instance of Cloudwatch publisher from classloader and set is using addPublisher() method
362365
private MetricPublisherConfiguration loadDefaultMetricPublisher() {
363-
return MetricPublisherConfiguration.builder()
364-
//.addPublisher()
365-
.build();
366+
MetricPublisherConfiguration.Builder config = MetricPublisherConfiguration.builder();
367+
368+
try {
369+
Class<?> clazz = Class.forName(CLOUDWATCH_PUBLISHER_PATH);
370+
MetricPublisher publisher = (MetricPublisher) clazz.getMethod("create").invoke(null);
371+
config.addPublisher(publisher);
372+
} catch (Exception e) {
373+
log.debug(() -> "Failed to create the default CloudWatch metric publisher", e);
374+
}
375+
376+
return config.build();
366377
}
367378

368379
public final void setOverrideConfiguration(ClientOverrideConfiguration overrideConfiguration) {

metrics-publishers/cloudwatch-publisher/pom.xml

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,18 @@
5252
<version>${awsjavasdk.version}</version>
5353
</dependency>
5454

55+
<dependency>
56+
<groupId>software.amazon.awssdk</groupId>
57+
<artifactId>test-utils</artifactId>
58+
<version>${awsjavasdk.version}</version>
59+
<scope>test</scope>
60+
</dependency>
61+
<dependency>
62+
<groupId>software.amazon.awssdk</groupId>
63+
<artifactId>service-test-utils</artifactId>
64+
<version>${awsjavasdk.version}</version>
65+
<scope>test</scope>
66+
</dependency>
5567
<dependency>
5668
<groupId>junit</groupId>
5769
<artifactId>junit</artifactId>
@@ -63,13 +75,8 @@
6375
<scope>test</scope>
6476
</dependency>
6577
<dependency>
66-
<groupId>org.hamcrest</groupId>
67-
<artifactId>hamcrest-all</artifactId>
68-
<scope>test</scope>
69-
</dependency>
70-
<dependency>
71-
<groupId>com.github.tomakehurst</groupId>
72-
<artifactId>wiremock</artifactId>
78+
<groupId>org.mockito</groupId>
79+
<artifactId>mockito-core</artifactId>
7380
<scope>test</scope>
7481
</dependency>
7582
</dependencies>
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,209 @@
1+
/*
2+
* Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
116
package software.amazon.awssdk.metrics.publishers.cloudwatch;
217

18+
import java.time.Duration;
19+
import java.util.ArrayList;
20+
import java.util.List;
21+
import java.util.concurrent.BlockingQueue;
322
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.Executors;
25+
import java.util.concurrent.LinkedBlockingQueue;
26+
import java.util.concurrent.ScheduledExecutorService;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicBoolean;
29+
import software.amazon.awssdk.annotations.SdkPublicApi;
430
import software.amazon.awssdk.metrics.publisher.MetricPublisher;
31+
import software.amazon.awssdk.metrics.publishers.cloudwatch.internal.Consumer;
32+
import software.amazon.awssdk.metrics.publishers.cloudwatch.internal.Producer;
533
import software.amazon.awssdk.metrics.registry.MetricRegistry;
34+
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
35+
import software.amazon.awssdk.services.cloudwatch.model.MetricDatum;
36+
import software.amazon.awssdk.services.cloudwatch.model.PutMetricDataResponse;
37+
import software.amazon.awssdk.utils.Logger;
38+
import software.amazon.awssdk.utils.ThreadFactoryBuilder;
39+
import software.amazon.awssdk.utils.Validate;
40+
41+
/**
42+
* An implementation of the {@link MetricPublisher} that uploads metrics to Amazon CloudWatch.
43+
*/
44+
@SdkPublicApi
45+
public final class CloudWatchPublisher implements MetricPublisher {
46+
47+
private static final Logger log = Logger.loggerFor(CloudWatchPublisher.class);
648

7-
public class CloudWatchPublisher implements MetricPublisher {
49+
/**
50+
* Limit max number of service calls to CloudWatch in a single {@link #publish()} method call
51+
*/
52+
private static final int MAX_SERVICE_CALLS_PER_PUBLISH = 10;
53+
54+
private final CloudWatchAsyncClient client;
55+
private final Duration publishFrequency;
56+
private String namespace;
57+
private final int metricQueueSize;
58+
private final BlockingQueue<MetricDatum> queue;
59+
private final Producer producer;
60+
private final ExecutorService producerExecutorService;
61+
private final Consumer consumer;
62+
private final ScheduledExecutorService consumerExecutorService;
63+
private final AtomicBoolean publishStarted = new AtomicBoolean(false);
64+
65+
private CloudWatchPublisher(Builder builder) {
66+
this.client = resolveClient(builder.client);
67+
this.publishFrequency = Validate.notNull(builder.publishFrequency, "Publish frequency cannot be null.");
68+
this.namespace = Validate.notEmpty(builder.namespace, "Namespace cannot be null or empty.");
69+
this.metricQueueSize = Validate.isPositive(builder.metricQueueSize, "Metric queue size should be positive.");
70+
this.queue = new LinkedBlockingQueue<>(this.metricQueueSize);
71+
this.producer = Producer.builder().queue(queue).build();
72+
this.producerExecutorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
73+
.threadNamePrefix("sdk-cloudwatch-producer")
74+
.build());
75+
this.consumer= Consumer.builder()
76+
.cloudWatchClient(client)
77+
.queue(queue)
78+
.namespace(namespace)
79+
.build();
80+
this.consumerExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder()
81+
.threadNamePrefix("sdk-cloudwatch-consumer")
82+
.build());
83+
}
884

85+
/**
86+
* CloudWatch publisher converts the given metrics into {@link MetricDatum} instances and add them
87+
* to a queue for publishing.
88+
*
89+
* @param metricsRegistry registry containing the collected metrics
90+
*/
991
@Override
1092
public void registerMetrics(MetricRegistry metricsRegistry) {
93+
try {
94+
if (publishStarted.compareAndSet(false, true)) {
95+
consumerExecutorService.scheduleAtFixedRate(this::publish, 0L,publishFrequency.toMillis(), TimeUnit.MILLISECONDS);
96+
}
1197

98+
producerExecutorService.execute(() -> producer.addMetrics(metricsRegistry));
99+
} catch (Throwable throwable) {
100+
log.debug(() -> "An error occurred when registering metrics in the publisher", throwable);
101+
}
12102
}
13103

14104
@Override
15105
public CompletableFuture<Void> publish() {
16-
return null;
106+
List<CompletableFuture<PutMetricDataResponse>> futures = new ArrayList<>();
107+
int count = 0;
108+
109+
try {
110+
while (queue.peek() != null && ++count <= MAX_SERVICE_CALLS_PER_PUBLISH) {
111+
futures.add(consumer.call());
112+
}
113+
} catch (Throwable throwable) {
114+
log.debug(() -> "An error occurred when uploading metrics to CloudWatch.", throwable);
115+
}
116+
117+
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
17118
}
18119

19120
@Override
20121
public void close() throws Exception {
122+
if (producerExecutorService != null) {
123+
producerExecutorService.shutdown();
124+
}
125+
126+
if (client != null) {
127+
client.close();
128+
}
129+
130+
if (consumerExecutorService != null) {
131+
consumerExecutorService.shutdown();
132+
}
133+
}
134+
135+
private CloudWatchAsyncClient resolveClient(CloudWatchAsyncClient builderClient) {
136+
return builderClient != null ? builderClient : CloudWatchAsyncClient.create();
137+
}
138+
139+
/**
140+
* @return A {@link Builder} object to build {@link CloudWatchPublisher}.
141+
*/
142+
public static Builder builder() {
143+
return new Builder();
144+
}
145+
146+
/**
147+
* @return A new instance of {@link CloudWatchPublisher} with all defaults.
148+
*/
149+
public static CloudWatchPublisher create() {
150+
return builder().build();
151+
}
152+
153+
/**
154+
* Builder class to construct {@link CloudWatchPublisher} instances.
155+
*/
156+
public static final class Builder {
157+
private static final String DEFAULT_NAMESPACE = "AwsSdk/JavaSdk2x";
158+
private static final int QUEUE_SIZE = 100;
159+
private static final Duration DEFAULT_PUBLISH_FREQUENCY = Duration.ofMinutes(1);
160+
161+
private CloudWatchAsyncClient client;
162+
private Duration publishFrequency = DEFAULT_PUBLISH_FREQUENCY;
163+
private String namespace = DEFAULT_NAMESPACE;
164+
private int metricQueueSize = QUEUE_SIZE;
165+
166+
/**
167+
* @param client async client to use for uploads metrics to Amazon CloudWatch
168+
* @return This object for method chaining
169+
*/
170+
Builder cloudWatchClient(CloudWatchAsyncClient client) {
171+
this.client = client;
172+
return this;
173+
}
174+
175+
/**
176+
* @param publishFrequency the timeout between consecutive {@link CloudWatchPublisher#publish()} calls
177+
* @return This object for method chaining
178+
*/
179+
Builder publishFrequency(Duration publishFrequency) {
180+
this.publishFrequency = publishFrequency;
181+
return this;
182+
}
183+
184+
/**
185+
* @param metricQueueSize max number of metrics to store in queue. If the queue is full, new metrics are dropped
186+
* @return This object for method chaining
187+
*/
188+
Builder metricQueueSize(int metricQueueSize) {
189+
this.metricQueueSize = metricQueueSize;
190+
return this;
191+
}
192+
193+
/**
194+
* @param namespace The CloudWatch namespace for the metric data
195+
* @return This object for method chaining
196+
*/
197+
Builder namespace(String namespace) {
198+
this.namespace = namespace;
199+
return this;
200+
}
21201

202+
/**
203+
* @return an instance of {@link CloudWatchPublisher}
204+
*/
205+
CloudWatchPublisher build() {
206+
return new CloudWatchPublisher(this);
207+
}
22208
}
23209
}

0 commit comments

Comments
 (0)