Skip to content

Commit e570e2b

Browse files
authored
Add User Agent for Sqs Calls made using Automatic Batching Manager (#5546)
* Add User Agent for Sqs Calls made using Automatic Batching Manager as hll/abm * Review comments
1 parent 041eaa4 commit e570e2b

File tree

9 files changed

+213
-77
lines changed

9 files changed

+213
-77
lines changed

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

18+
import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;
19+
1820
import java.util.ArrayList;
1921
import java.util.List;
2022
import java.util.Optional;
@@ -51,26 +53,34 @@ protected ChangeMessageVisibilityBatchManager(RequestBatchConfiguration override
5153

5254
private static ChangeMessageVisibilityBatchRequest createChangeMessageVisibilityBatchRequest(
5355
List<IdentifiableMessage<ChangeMessageVisibilityRequest>> identifiedRequests, String batchKey) {
54-
List<ChangeMessageVisibilityBatchRequestEntry> entries = identifiedRequests
55-
.stream()
56-
.map(identifiedRequest -> createChangeMessageVisibilityBatchRequestEntry(identifiedRequest.id(),
57-
identifiedRequest.message()))
58-
.collect(Collectors.toList());
59-
// Since requests are batched together according to a combination of their queueUrl and overrideConfiguration,
60-
// all requests must have the same overrideConfiguration so it is sufficient to retrieve it from the first
61-
// request.
62-
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0).message()
56+
57+
List<ChangeMessageVisibilityBatchRequestEntry> entries =
58+
identifiedRequests.stream()
59+
.map(identifiedRequest -> createChangeMessageVisibilityBatchRequestEntry(
60+
identifiedRequest.id(),
61+
identifiedRequest.message()))
62+
.collect(Collectors.toList());
63+
64+
// All requests have the same overrideConfiguration, so it's sufficient to retrieve it from the first request.
65+
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0)
66+
.message()
6367
.overrideConfiguration();
64-
return overrideConfiguration.map(
65-
config -> ChangeMessageVisibilityBatchRequest.builder()
66-
.queueUrl(batchKey)
67-
.overrideConfiguration(config)
68-
.entries(entries)
69-
.build())
70-
.orElseGet(() -> ChangeMessageVisibilityBatchRequest.builder()
71-
.queueUrl(batchKey)
72-
.entries(entries)
73-
.build());
68+
69+
return overrideConfiguration
70+
.map(config -> ChangeMessageVisibilityBatchRequest.builder()
71+
.queueUrl(batchKey)
72+
.overrideConfiguration(config.toBuilder()
73+
.applyMutation(USER_AGENT_APPLIER)
74+
.build())
75+
.entries(entries)
76+
.build())
77+
.orElseGet(() -> ChangeMessageVisibilityBatchRequest.builder()
78+
.queueUrl(batchKey)
79+
.overrideConfiguration(o -> o
80+
.applyMutation(USER_AGENT_APPLIER)
81+
.build())
82+
.entries(entries)
83+
.build());
7484
}
7585

7686
private static ChangeMessageVisibilityBatchRequestEntry createChangeMessageVisibilityBatchRequestEntry(

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

18+
import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;
19+
1820
import java.util.ArrayList;
1921
import java.util.List;
2022
import java.util.Optional;
@@ -50,20 +52,38 @@ protected DeleteMessageBatchManager(RequestBatchConfiguration overrideConfigurat
5052

5153
private static DeleteMessageBatchRequest createDeleteMessageBatchRequest(
5254
List<IdentifiableMessage<DeleteMessageRequest>> identifiedRequests, String batchKey) {
55+
5356
List<DeleteMessageBatchRequestEntry> entries = identifiedRequests
5457
.stream()
55-
.map(identifiedRequest -> createDeleteMessageBatchRequestEntry(identifiedRequest.id(),
56-
identifiedRequest.message()))
58+
.map(identifiedRequest -> createDeleteMessageBatchRequestEntry(
59+
identifiedRequest.id(), identifiedRequest.message()
60+
))
5761
.collect(Collectors.toList());
62+
5863
// Since requests are batched together according to a combination of their queueUrl and overrideConfiguration,
59-
// all requests must have the same overrideConfiguration so it is sufficient to retrieve it from the first
60-
// request.
64+
// all requests must have the same overrideConfiguration, so it is sufficient to retrieve it from the first request.
6165
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0).message()
6266
.overrideConfiguration();
67+
6368
return overrideConfiguration.map(
64-
overrideConfig -> DeleteMessageBatchRequest.builder().queueUrl(batchKey).overrideConfiguration(overrideConfig)
65-
.entries(entries).build()).orElse(
66-
DeleteMessageBatchRequest.builder().queueUrl(batchKey).entries(entries).build());
69+
overrideConfig -> DeleteMessageBatchRequest.builder()
70+
.queueUrl(batchKey)
71+
.overrideConfiguration(
72+
overrideConfig.toBuilder()
73+
.applyMutation(USER_AGENT_APPLIER)
74+
.build()
75+
)
76+
.entries(entries)
77+
.build()
78+
).orElseGet(
79+
() -> DeleteMessageBatchRequest.builder()
80+
.queueUrl(batchKey)
81+
.overrideConfiguration(o ->
82+
o.applyMutation(USER_AGENT_APPLIER).build()
83+
)
84+
.entries(entries)
85+
.build()
86+
);
6787
}
6888

6989
private static DeleteMessageBatchRequestEntry createDeleteMessageBatchRequestEntry(String id, DeleteMessageRequest request) {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/QueueAttributesManager.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

18+
import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;
19+
1820
import java.time.Duration;
1921
import java.util.Arrays;
2022
import java.util.List;
@@ -138,22 +140,21 @@ private CompletableFuture<Map<QueueAttributeName, String>> fetchQueueAttributes(
138140
GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
139141
.queueUrl(queueUrl)
140142
.attributeNames(QUEUE_ATTRIBUTE_NAMES)
143+
.overrideConfiguration(o -> o
144+
.applyMutation(USER_AGENT_APPLIER))
141145
.build();
142146

143-
CompletableFuture<Map<QueueAttributeName, String>> future =
144-
sqsClient.getQueueAttributes(request)
145-
.thenApply(response -> {
146-
Map<QueueAttributeName, String> attributes = response.attributes();
147-
Validate.notNull(attributes.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS),
148-
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS
149-
+ " attribute is null in SQS.");
150-
Validate.notNull(attributes.get(QueueAttributeName.VISIBILITY_TIMEOUT),
151-
QueueAttributeName.VISIBILITY_TIMEOUT + " attribute is null in SQS.");
152-
return attributes.entrySet().stream()
153-
.filter(entry -> QUEUE_ATTRIBUTE_NAMES.contains(entry.getKey()))
154-
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
155-
});
156-
157-
return future;
147+
return sqsClient.getQueueAttributes(request)
148+
.thenApply(response -> {
149+
Map<QueueAttributeName, String> attributes = response.attributes();
150+
Validate.notNull(attributes.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS),
151+
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS
152+
+ " attribute is null in SQS.");
153+
Validate.notNull(attributes.get(QueueAttributeName.VISIBILITY_TIMEOUT),
154+
QueueAttributeName.VISIBILITY_TIMEOUT + " attribute is null in SQS.");
155+
return attributes.entrySet().stream()
156+
.filter(entry -> QUEUE_ATTRIBUTE_NAMES.contains(entry.getKey()))
157+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
158+
});
158159
}
159160
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveSqsMessageHelper.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

1818

19+
import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;
20+
1921
import java.time.Duration;
2022
import java.util.List;
2123
import java.util.Objects;
@@ -72,7 +74,8 @@ public CompletableFuture<ReceiveSqsMessageHelper> asyncReceiveMessage() {
7274
ReceiveMessageRequest.Builder request =
7375
ReceiveMessageRequest.builder()
7476
.queueUrl(queueUrl)
75-
.maxNumberOfMessages(config.maxBatchItems());
77+
.maxNumberOfMessages(config.maxBatchItems())
78+
.overrideConfiguration(o -> o.applyMutation(USER_AGENT_APPLIER));
7679

7780
if (!CollectionUtils.isNullOrEmpty(config.messageSystemAttributeNames())) {
7881
request.messageSystemAttributeNames(config.messageSystemAttributeNames());
@@ -158,10 +161,12 @@ private CompletableFuture<ChangeMessageVisibilityBatchResponse> nackMessages() {
158161
.build())
159162
.collect(Collectors.toList());
160163

161-
ChangeMessageVisibilityBatchRequest batchRequest = ChangeMessageVisibilityBatchRequest.builder()
162-
.queueUrl(queueUrl)
163-
.entries(entries)
164-
.build();
164+
ChangeMessageVisibilityBatchRequest batchRequest =
165+
ChangeMessageVisibilityBatchRequest.builder()
166+
.queueUrl(queueUrl)
167+
.entries(entries)
168+
.overrideConfiguration(o -> o.applyMutation(USER_AGENT_APPLIER))
169+
.build();
165170

166171
return asyncClient.changeMessageVisibilityBatch(batchRequest);
167172
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SendMessageBatchManager.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

18+
import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.USER_AGENT_APPLIER;
19+
1820
import java.util.ArrayList;
1921
import java.util.List;
2022
import java.util.Optional;
@@ -83,19 +85,31 @@ private static IdentifiableMessage<SendMessageResponse> createSendMessageRespons
8385

8486
private static SendMessageBatchRequest createSendMessageBatchRequest(
8587
List<IdentifiableMessage<SendMessageRequest>> identifiedRequests, String batchKey) {
86-
List<SendMessageBatchRequestEntry> entries = identifiedRequests
87-
.stream()
88-
.map(identifiedRequest -> createSendMessageBatchRequestEntry(identifiedRequest.id(), identifiedRequest.message()))
89-
.collect(Collectors.toList());
90-
// Since requests are batched together according to a combination of their queueUrl and overrideConfiguration,
91-
// all requests must have the same overrideConfiguration so it is sufficient to retrieve it from the first
92-
// request.
93-
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0).message()
88+
89+
List<SendMessageBatchRequestEntry> entries =
90+
identifiedRequests.stream()
91+
.map(identifiedRequest -> createSendMessageBatchRequestEntry(identifiedRequest.id(),
92+
identifiedRequest.message()))
93+
.collect(Collectors.toList());
94+
95+
// All requests must have the same overrideConfiguration, so retrieve it from the first request.
96+
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0)
97+
.message()
9498
.overrideConfiguration();
95-
return overrideConfiguration.map(
96-
overrideConfig -> SendMessageBatchRequest.builder().queueUrl(batchKey).overrideConfiguration(overrideConfig)
97-
.entries(entries).build()).orElse(
98-
SendMessageBatchRequest.builder().queueUrl(batchKey).entries(entries).build());
99+
100+
return overrideConfiguration
101+
.map(overrideConfig -> SendMessageBatchRequest.builder()
102+
.queueUrl(batchKey)
103+
.overrideConfiguration(overrideConfig.toBuilder()
104+
.applyMutation(USER_AGENT_APPLIER)
105+
.build())
106+
.entries(entries)
107+
.build())
108+
.orElseGet(() -> SendMessageBatchRequest.builder()
109+
.queueUrl(batchKey)
110+
.overrideConfiguration(o -> o.applyMutation(USER_AGENT_APPLIER))
111+
.entries(entries)
112+
.build());
99113
}
100114

101115
private static SendMessageBatchRequestEntry createSendMessageBatchRequestEntry(String id, SendMessageRequest request) {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsMessageDefault.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,10 @@
1515

1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

18+
import java.util.function.Consumer;
1819
import software.amazon.awssdk.annotations.SdkInternalApi;
20+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
21+
import software.amazon.awssdk.core.ApiName;
1922

2023
@SdkInternalApi
2124
public final class SqsMessageDefault {
@@ -24,6 +27,11 @@ public final class SqsMessageDefault {
2427

2528
public static final int MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES = 262_144; // 256 KiB
2629

30+
// abm stands for Automatic Batching Manager
31+
public static final Consumer<AwsRequestOverrideConfiguration.Builder> USER_AGENT_APPLIER =
32+
b -> b.addApiName(ApiName.builder().version("abm").name("hll").build());
33+
34+
2735
/**
2836
* <a href="https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes">
2937
* AWS SQS Message Attributes Documentation</a>

0 commit comments

Comments
 (0)