-
Notifications
You must be signed in to change notification settings - Fork 916
Metric publisher implementation for CloudWatch #1375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still going through the PR
consumerExecutorService.shutdown(); | ||
} | ||
} catch (Throwable t) { | ||
log.debug(() -> "An error occurred when closing the CloudWatch publisher", t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do warn
? We might also want to do try-catch for individual resources here so that the failure to close the first resource would not affect the closing of the subsequent resources.
private Duration publishFrequency = DEFAULT_PUBLISH_FREQUENCY; | ||
private String namespace = DEFAULT_NAMESPACE; | ||
private int metricQueueSize = QUEUE_SIZE; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing private ctor
|
||
private final CloudWatchAsyncClient client; | ||
|
||
private String namespace; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final?
* Retrieves {@link MetricDatum}s from the shared blocking queue and upload them to Amazon CloudWatch. | ||
*/ | ||
@SdkInternalApi | ||
public class Consumer implements Callable<CompletableFuture<PutMetricDataResponse>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename it to something like MetricConsumer
? Because it can be confused with the Java Consumer class.
<version>2.7.12-SNAPSHOT</version> | ||
</parent> | ||
|
||
<artifactId>cloudwatch-publisher</artifactId> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we rename it to cloudwatch-metrics-publisher
? My concern is that there might be a cloudwatch publisher service in the future, who knows.
log.debug(() -> "Due to capacity restrictions, some items from the metric registry are not added to the queue.", | ||
exception); | ||
} catch (Exception e) { | ||
log.debug(() -> "Some error adding metrics to queue", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warn
? nit, An error occurred when adding metrics to the queue
?
|
||
queue.addAll(results); | ||
log.debug(() -> "Number of metrics added to queue: " + results.size()); | ||
} catch (IllegalStateException exception) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to catch IllegalStateException
just for extra logging?
I feel the logging at line 65 is sufficient
|
||
private BlockingQueue<MetricDatum> queue; | ||
private MetricTransformer metricTransformer; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing private ctor
import software.amazon.awssdk.testutils.service.AwsIntegrationTestBase; | ||
|
||
@RunWith(MockitoJUnitRunner.class) | ||
public class CloudWatchPublisherIntegrationTest extends AwsIntegrationTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, this is not integration test though. Can we add some integration tests?
log.debug(() -> "An error occurred when uploading metrics to CloudWatch.", throwable); | ||
} | ||
|
||
CompletableFuture<Void> finalFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to cancel other futures if one of future fails of non-recoverable errors like 400? because it's likely all the futures will fail of the same reason. We are doing it for transfermanager.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we think that one 400 will make all 400?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might not necessary be 400, maybe something like permission issue and all requests would fail if the credential is not correct. We don't have to do it right now, just something to think about.
ac755ff
to
05bf05e
Compare
producerExecutorService.shutdown(); | ||
} | ||
|
||
if (client != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do try-catch for individual resources here so that the failure to close the first resource would not affect the closing of the subsequent resources?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
private final String namespace; | ||
|
||
private MetricConsumer(Builder builder) { | ||
this.client = Validate.notNull(builder.cloudWatchClient, "CloudWatch client cannot be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, we can use Validate.paramNotNull(builder.cloudWatchClient, "cloudWatchClient")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated
private MetricConsumer(Builder builder) { | ||
this.client = Validate.notNull(builder.cloudWatchClient, "CloudWatch client cannot be null"); | ||
this.queue = Validate.notNull(builder.queue, "Queue cannot be null"); | ||
this.namespace = Validate.notNull(builder.namespace, "Namespace cannot be null or empty."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems only checking null and not checking empty
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
import software.amazon.awssdk.testutils.service.AwsIntegrationTestBase; | ||
|
||
@RunWith(MockitoJUnitRunner.class) | ||
public class CloudWatchPublisherIntegrationTest extends AwsIntegrationTestBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename the test since it's not an integration test? Can we add an integration test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed. Will add integration test once I combine this and the other PR.
|
||
private MetricProducer(Builder builder) { | ||
this.queue = Validate.notNull(builder.queue, "Queue cannot be null"); | ||
this.metricTransformer = builder.metricTransformer != null ? builder.metricTransformer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can customer provide metricTransformer
? It doesn't look like so from the CloudWatchMetricsPublisher
apis.
Should we remove it from the builder?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed and added note to sync with Varun on this. Not sure why it was ever exposed.
int count = 0; | ||
|
||
try { | ||
while (queue.peek() != null && ++count <= MAX_SERVICE_CALLS_PER_PUBLISH) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we think about moving the queue in the MetricConsumer
class? This class doesn't need to have knowledge of the queue at all and two classes having access to the same queue might be problematic in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As in move this logic to the MetricConsumer class? Not sure I follow otherwise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I meant moving the creation of the queue and this logic into MetricConsumer
class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The MetricProducer also uses the queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I missed that part. Do we need to check queue.peek
here? can we move this logic to consumer?
return client.putMetricData(request); | ||
} | ||
|
||
private List<MetricDatum> metricDatums() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this list? Can we poll from the queue directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to have a separate list to guarantee the size of the data being published to cloudwatch.
*/ | ||
public void addMetrics(MetricRegistry metricRegistry) { | ||
try { | ||
List<MetricDatum> results = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question, do we need this list? can we offer the item to the queue directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, it helps maintain a bit of grouping. Rather than adding to the transforming and adding to the queue X times, where other threads could also be touching the queue, this will add the entire list all together at once.
05bf05e
to
e3a5ef1
Compare
e3a5ef1
to
f8e7fb6
Compare
What's the status of this PR? @spfink |
- Fix compile issues related to renamed/changed class - Fix failing tests - Fix undeclared dependencies
Kudos, SonarCloud Quality Gate passed!
|
Continuing from #1354
Description
Adds a metric publisher implementation for publishing collected metrics to CloudWatch.
Testing
Tests included.
Types of changes
Checklist
mvn install
succeedsLicense