Skip to content

Add User Agent for Sqs Calls made using Automatic Batching Manager #5546

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -51,26 +53,34 @@ protected ChangeMessageVisibilityBatchManager(RequestBatchConfiguration override

private static ChangeMessageVisibilityBatchRequest createChangeMessageVisibilityBatchRequest(
List<IdentifiableMessage<ChangeMessageVisibilityRequest>> identifiedRequests, String batchKey) {
List<ChangeMessageVisibilityBatchRequestEntry> entries = identifiedRequests
.stream()
.map(identifiedRequest -> createChangeMessageVisibilityBatchRequestEntry(identifiedRequest.id(),
identifiedRequest.message()))
.collect(Collectors.toList());
// Since requests are batched together according to a combination of their queueUrl and overrideConfiguration,
// all requests must have the same overrideConfiguration so it is sufficient to retrieve it from the first
// request.
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0).message()

List<ChangeMessageVisibilityBatchRequestEntry> entries =
identifiedRequests.stream()
.map(identifiedRequest -> createChangeMessageVisibilityBatchRequestEntry(
identifiedRequest.id(),
identifiedRequest.message()))
.collect(Collectors.toList());

// All requests have the same overrideConfiguration, so it's sufficient to retrieve it from the first request.
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0)
.message()
.overrideConfiguration();
return overrideConfiguration.map(
config -> ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(config)
.entries(entries)
.build())
.orElseGet(() -> ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(batchKey)
.entries(entries)
.build());

return overrideConfiguration
.map(config -> ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(config.toBuilder()
.applyMutation(USER_AGENT_APPLIER)
.build())
.entries(entries)
.build())
.orElseGet(() -> ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(o -> o
.applyMutation(USER_AGENT_APPLIER)
.build())
.entries(entries)
.build());
}

private static ChangeMessageVisibilityBatchRequestEntry createChangeMessageVisibilityBatchRequestEntry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -50,20 +52,38 @@ protected DeleteMessageBatchManager(RequestBatchConfiguration overrideConfigurat

private static DeleteMessageBatchRequest createDeleteMessageBatchRequest(
List<IdentifiableMessage<DeleteMessageRequest>> identifiedRequests, String batchKey) {

List<DeleteMessageBatchRequestEntry> entries = identifiedRequests
.stream()
.map(identifiedRequest -> createDeleteMessageBatchRequestEntry(identifiedRequest.id(),
identifiedRequest.message()))
.map(identifiedRequest -> createDeleteMessageBatchRequestEntry(
identifiedRequest.id(), identifiedRequest.message()
))
.collect(Collectors.toList());

// Since requests are batched together according to a combination of their queueUrl and overrideConfiguration,
// all requests must have the same overrideConfiguration so it is sufficient to retrieve it from the first
// request.
// all requests must have the same overrideConfiguration, so it is sufficient to retrieve it from the first request.
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0).message()
.overrideConfiguration();

return overrideConfiguration.map(
overrideConfig -> DeleteMessageBatchRequest.builder().queueUrl(batchKey).overrideConfiguration(overrideConfig)
.entries(entries).build()).orElse(
DeleteMessageBatchRequest.builder().queueUrl(batchKey).entries(entries).build());
overrideConfig -> DeleteMessageBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(
overrideConfig.toBuilder()
.applyMutation(USER_AGENT_APPLIER)
.build()
)
.entries(entries)
.build()
).orElseGet(
() -> DeleteMessageBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(o ->
o.applyMutation(USER_AGENT_APPLIER).build()
)
.entries(entries)
.build()
);
}

private static DeleteMessageBatchRequestEntry createDeleteMessageBatchRequestEntry(String id, DeleteMessageRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -138,22 +140,21 @@ private CompletableFuture<Map<QueueAttributeName, String>> fetchQueueAttributes(
GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
.queueUrl(queueUrl)
.attributeNames(QUEUE_ATTRIBUTE_NAMES)
.overrideConfiguration(o -> o
.applyMutation(USER_AGENT_APPLIER))
.build();

CompletableFuture<Map<QueueAttributeName, String>> future =
sqsClient.getQueueAttributes(request)
.thenApply(response -> {
Map<QueueAttributeName, String> attributes = response.attributes();
Validate.notNull(attributes.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS),
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS
+ " attribute is null in SQS.");
Validate.notNull(attributes.get(QueueAttributeName.VISIBILITY_TIMEOUT),
QueueAttributeName.VISIBILITY_TIMEOUT + " attribute is null in SQS.");
return attributes.entrySet().stream()
.filter(entry -> QUEUE_ATTRIBUTE_NAMES.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
});

return future;
return sqsClient.getQueueAttributes(request)
.thenApply(response -> {
Map<QueueAttributeName, String> attributes = response.attributes();
Validate.notNull(attributes.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS),
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS
+ " attribute is null in SQS.");
Validate.notNull(attributes.get(QueueAttributeName.VISIBILITY_TIMEOUT),
QueueAttributeName.VISIBILITY_TIMEOUT + " attribute is null in SQS.");
return attributes.entrySet().stream()
.filter(entry -> QUEUE_ATTRIBUTE_NAMES.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package software.amazon.awssdk.services.sqs.internal.batchmanager;


import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;

import java.time.Duration;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -72,7 +74,8 @@ public CompletableFuture<ReceiveSqsMessageHelper> asyncReceiveMessage() {
ReceiveMessageRequest.Builder request =
ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(config.maxBatchItems());
.maxNumberOfMessages(config.maxBatchItems())
.overrideConfiguration(o -> o.applyMutation(USER_AGENT_APPLIER));

if (!CollectionUtils.isNullOrEmpty(config.messageSystemAttributeNames())) {
request.messageSystemAttributeNames(config.messageSystemAttributeNames());
Expand Down Expand Up @@ -158,10 +161,12 @@ private CompletableFuture<ChangeMessageVisibilityBatchResponse> nackMessages() {
.build())
.collect(Collectors.toList());

ChangeMessageVisibilityBatchRequest batchRequest = ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(queueUrl)
.entries(entries)
.build();
ChangeMessageVisibilityBatchRequest batchRequest =
ChangeMessageVisibilityBatchRequest.builder()
.queueUrl(queueUrl)
.entries(entries)
.overrideConfiguration(o -> o.applyMutation(USER_AGENT_APPLIER))
.build();

return asyncClient.changeMessageVisibilityBatch(batchRequest);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -83,19 +85,31 @@ private static IdentifiableMessage<SendMessageResponse> createSendMessageRespons

private static SendMessageBatchRequest createSendMessageBatchRequest(
List<IdentifiableMessage<SendMessageRequest>> identifiedRequests, String batchKey) {
List<SendMessageBatchRequestEntry> entries = identifiedRequests
.stream()
.map(identifiedRequest -> createSendMessageBatchRequestEntry(identifiedRequest.id(), identifiedRequest.message()))
.collect(Collectors.toList());
// Since requests are batched together according to a combination of their queueUrl and overrideConfiguration,
// all requests must have the same overrideConfiguration so it is sufficient to retrieve it from the first
// request.
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0).message()

List<SendMessageBatchRequestEntry> entries =
identifiedRequests.stream()
.map(identifiedRequest -> createSendMessageBatchRequestEntry(identifiedRequest.id(),
identifiedRequest.message()))
.collect(Collectors.toList());

// All requests must have the same overrideConfiguration, so retrieve it from the first request.
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0)
.message()
.overrideConfiguration();
return overrideConfiguration.map(
overrideConfig -> SendMessageBatchRequest.builder().queueUrl(batchKey).overrideConfiguration(overrideConfig)
.entries(entries).build()).orElse(
SendMessageBatchRequest.builder().queueUrl(batchKey).entries(entries).build());

return overrideConfiguration
.map(overrideConfig -> SendMessageBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(overrideConfig.toBuilder()
.applyMutation(USER_AGENT_APPLIER)
.build())
.entries(entries)
.build())
.orElseGet(() -> SendMessageBatchRequest.builder()
.queueUrl(batchKey)
.overrideConfiguration(o -> o.applyMutation(USER_AGENT_APPLIER))
.entries(entries)
.build());
}

private static SendMessageBatchRequestEntry createSendMessageBatchRequestEntry(String id, SendMessageRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.ApiName;

@SdkInternalApi
public final class SqsMessageDefault {
Expand All @@ -24,6 +27,11 @@ public final class SqsMessageDefault {

public static final int MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES = 262_144; // 256 KiB

// abm stands for Automatic Batching Manager
public static final Consumer<AwsRequestOverrideConfiguration.Builder> USER_AGENT_APPLIER =
b -> b.addApiName(ApiName.builder().version("abm").name("hll").build());


/**
* <a href="https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes">
* AWS SQS Message Attributes Documentation</a>
Expand Down
Loading
Loading