Skip to content

Metric publisher implementation for CloudWatch #1375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 10, 2020

Conversation

spfink
Copy link
Contributor

@spfink spfink commented Aug 6, 2019

Continuing from #1354

Description

Adds a metric publisher implementation for publishing collected metrics to CloudWatch.

Testing

Tests included.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)

Checklist

  • I have read the CONTRIBUTING document
  • Local run of mvn install succeeds
  • My code follows the code style of this project
  • My change requires a change to the Javadoc documentation
  • I have updated the Javadoc documentation accordingly
  • I have read the README document
  • I have added tests to cover my changes
  • All new and existing tests passed
  • A short description of the change has been added to the CHANGELOG
  • My change is to implement 1.11 parity feature and I have updated LaunchChangelog

License

  • I confirm that this pull request can be released under the Apache 2 license

Copy link
Contributor

@zoewangg zoewangg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still going through the PR

consumerExecutorService.shutdown();
}
} catch (Throwable t) {
log.debug(() -> "An error occurred when closing the CloudWatch publisher", t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do warn? We might also want to do try-catch for individual resources here so that the failure to close the first resource would not affect the closing of the subsequent resources.

private Duration publishFrequency = DEFAULT_PUBLISH_FREQUENCY;
private String namespace = DEFAULT_NAMESPACE;
private int metricQueueSize = QUEUE_SIZE;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing private ctor


private final CloudWatchAsyncClient client;

private String namespace;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final?

* Retrieves {@link MetricDatum}s from the shared blocking queue and upload them to Amazon CloudWatch.
*/
@SdkInternalApi
public class Consumer implements Callable<CompletableFuture<PutMetricDataResponse>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename it to something like MetricConsumer? Because it can be confused with the Java Consumer class.

<version>2.7.12-SNAPSHOT</version>
</parent>

<artifactId>cloudwatch-publisher</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename it to cloudwatch-metrics-publisher? My concern is that there might be a cloudwatch publisher service in the future, who knows.

log.debug(() -> "Due to capacity restrictions, some items from the metric registry are not added to the queue.",
exception);
} catch (Exception e) {
log.debug(() -> "Some error adding metrics to queue", e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warn? nit, An error occurred when adding metrics to the queue?


queue.addAll(results);
log.debug(() -> "Number of metrics added to queue: " + results.size());
} catch (IllegalStateException exception) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to catch IllegalStateException just for extra logging?
I feel the logging at line 65 is sufficient


private BlockingQueue<MetricDatum> queue;
private MetricTransformer metricTransformer;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing private ctor

import software.amazon.awssdk.testutils.service.AwsIntegrationTestBase;

@RunWith(MockitoJUnitRunner.class)
public class CloudWatchPublisherIntegrationTest extends AwsIntegrationTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this is not integration test though. Can we add some integration tests?

log.debug(() -> "An error occurred when uploading metrics to CloudWatch.", throwable);
}

CompletableFuture<Void> finalFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to cancel other futures if one of future fails of non-recoverable errors like 400? because it's likely all the futures will fail of the same reason. We are doing it for transfermanager.

https://github.com/aws/aws-sdk-java-v2/blob/s3-transfermanager/utils/src/main/java/software/amazon/awssdk/utils/CompletableFutureUtils.java#L113

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we think that one 400 will make all 400?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might not necessary be 400, maybe something like permission issue and all requests would fail if the credential is not correct. We don't have to do it right now, just something to think about.

@spfink spfink force-pushed the finks/metrics-cw branch from ac755ff to 05bf05e Compare August 8, 2019 20:36
producerExecutorService.shutdown();
}

if (client != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do try-catch for individual resources here so that the failure to close the first resource would not affect the closing of the subsequent resources?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

private final String namespace;

private MetricConsumer(Builder builder) {
this.client = Validate.notNull(builder.cloudWatchClient, "CloudWatch client cannot be null");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, we can use Validate.paramNotNull(builder.cloudWatchClient, "cloudWatchClient")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

private MetricConsumer(Builder builder) {
this.client = Validate.notNull(builder.cloudWatchClient, "CloudWatch client cannot be null");
this.queue = Validate.notNull(builder.queue, "Queue cannot be null");
this.namespace = Validate.notNull(builder.namespace, "Namespace cannot be null or empty.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems only checking null and not checking empty

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

import software.amazon.awssdk.testutils.service.AwsIntegrationTestBase;

@RunWith(MockitoJUnitRunner.class)
public class CloudWatchPublisherIntegrationTest extends AwsIntegrationTestBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename the test since it's not an integration test? Can we add an integration test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed. Will add integration test once I combine this and the other PR.


private MetricProducer(Builder builder) {
this.queue = Validate.notNull(builder.queue, "Queue cannot be null");
this.metricTransformer = builder.metricTransformer != null ? builder.metricTransformer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can customer provide metricTransformer? It doesn't look like so from the CloudWatchMetricsPublisher apis.
Should we remove it from the builder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed and added note to sync with Varun on this. Not sure why it was ever exposed.

int count = 0;

try {
while (queue.peek() != null && ++count <= MAX_SERVICE_CALLS_PER_PUBLISH) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we think about moving the queue in the MetricConsumer class? This class doesn't need to have knowledge of the queue at all and two classes having access to the same queue might be problematic in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in move this logic to the MetricConsumer class? Not sure I follow otherwise.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I meant moving the creation of the queue and this logic into MetricConsumer class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MetricProducer also uses the queue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I missed that part. Do we need to check queue.peek here? can we move this logic to consumer?

return client.putMetricData(request);
}

private List<MetricDatum> metricDatums() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this list? Can we poll from the queue directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to have a separate list to guarantee the size of the data being published to cloudwatch.

*/
public void addMetrics(MetricRegistry metricRegistry) {
try {
List<MetricDatum> results = new ArrayList<>();
Copy link
Contributor

@zoewangg zoewangg Aug 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question, do we need this list? can we offer the item to the queue directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, it helps maintain a bit of grouping. Rather than adding to the transforming and adding to the queue X times, where other threads could also be touching the queue, this will add the entire list all together at once.

@spfink spfink closed this Aug 12, 2019
@spfink spfink reopened this Aug 12, 2019
@dagnir
Copy link
Contributor

dagnir commented Jan 9, 2020

What's the status of this PR? @spfink

@dagnir dagnir changed the base branch from master to sdk-metrics-development January 9, 2020 22:24
dagnir added 3 commits January 9, 2020 14:24
 - Fix compile issues related to renamed/changed class
 - Fix failing tests
 - Fix undeclared dependencies
@sonarqubecloud
Copy link

Kudos, SonarCloud Quality Gate passed!

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities (and Security Hotspot 0 Security Hotspots to review)
Code Smell A 7 Code Smells

No Coverage information No Coverage information
No Duplication information No Duplication information

@dagnir dagnir merged commit 8ff69a0 into sdk-metrics-development Jan 10, 2020
@dagnir dagnir deleted the finks/metrics-cw branch January 10, 2020 22:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants