Skip to content

Commit 18c06cc

Browse files
authored
Receive Batch Manager Implementation (#5488)
* Add Recieve Buffer Queue And its related configuration * Update ReceiveBatch manager * Recieve Batch Manager Implementation * Receive Batch Manager Implemetation * Handled review comments * Checkstyle failure * Flsky test case fixed * Flaky test case fixed * Hamdled review comments * Handled comments * Removed ReceiveMessageCompletableFuture * SdkClosable implemented * Added ReceiveMessageBatchManager class for completeness * Checkstyle issues * Null checks * Handled comments from Zoe
1 parent 21642b1 commit 18c06cc

27 files changed

+2886
-76
lines changed

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java

Lines changed: 287 additions & 32 deletions
Large diffs are not rendered by default.

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.util.concurrent.CompletableFuture;
1919
import java.util.concurrent.ScheduledExecutorService;
2020
import software.amazon.awssdk.annotations.SdkInternalApi;
21-
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2221
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2322
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
2423
import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager;
@@ -43,6 +42,8 @@ public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {
4342

4443
private final ChangeMessageVisibilityBatchManager changeMessageVisibilityBatchManager;
4544

45+
private final ReceiveMessageBatchManager receiveMessageBatchManager;
46+
4647
private DefaultSqsAsyncBatchManager(DefaultBuilder builder) {
4748
this.client = Validate.notNull(builder.client, "client cannot be null");
4849

@@ -57,19 +58,9 @@ private DefaultSqsAsyncBatchManager(DefaultBuilder builder) {
5758
this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(builder.overrideConfiguration,
5859
scheduledExecutor,
5960
client);
60-
//TODO : this will be updated while implementing the Receive Message Batch Manager
61-
}
6261

63-
@SdkTestInternalApi
64-
public DefaultSqsAsyncBatchManager(
65-
SqsAsyncClient client,
66-
SendMessageBatchManager sendMessageBatchManager,
67-
DeleteMessageBatchManager deleteMessageBatchManager,
68-
ChangeMessageVisibilityBatchManager changeMessageVisibilityBatchManager) {
69-
this.sendMessageBatchManager = sendMessageBatchManager;
70-
this.deleteMessageBatchManager = deleteMessageBatchManager;
71-
this.changeMessageVisibilityBatchManager = changeMessageVisibilityBatchManager;
72-
this.client = client;
62+
this.receiveMessageBatchManager = new ReceiveMessageBatchManager(client, scheduledExecutor,
63+
builder.overrideConfiguration);
7364
}
7465

7566
@Override
@@ -89,7 +80,7 @@ public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibilit
8980

9081
@Override
9182
public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRequest request) {
92-
return null;
83+
return this.receiveMessageBatchManager.batchRequest(request);
9384
}
9485

9586
public static SqsAsyncBatchManager.Builder builder() {
@@ -101,6 +92,7 @@ public void close() {
10192
sendMessageBatchManager.close();
10293
deleteMessageBatchManager.close();
10394
changeMessageVisibilityBatchManager.close();
95+
receiveMessageBatchManager.close();
10496
}
10597

10698
public static final class DefaultBuilder implements SqsAsyncBatchManager.Builder {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.sqs.internal.batchmanager;
17+
18+
import java.time.Duration;
19+
import java.util.Arrays;
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.atomic.AtomicReference;
25+
import java.util.stream.Collectors;
26+
import software.amazon.awssdk.annotations.SdkInternalApi;
27+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
28+
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
29+
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
30+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
31+
import software.amazon.awssdk.utils.CompletableFutureUtils;
32+
import software.amazon.awssdk.utils.Validate;
33+
34+
35+
/**
36+
* The {@code QueueAttributesManager} class is responsible for managing and retrieving specific attributes
37+
* of an AWS SQS queue, such as message wait time and visibility timeout. It efficiently caches these attributes
38+
* to minimize redundant API calls to SQS, ensuring that the attributes are fetched only once and reused in subsequent requests.
39+
*
40+
* <p>This class uses an {@link AtomicReference} to maintain the state of the attribute map, allowing concurrent access
41+
* and handling cases where the fetching of attributes may fail. If an error occurs during the retrieval of attributes,
42+
* the state is reset to allow for a fresh attempt in subsequent calls.</p>
43+
*
44+
* <p>The class provides methods to get the visibility timeout and calculate the message receive timeout, which
45+
* are asynchronously retrieved and processed using {@link CompletableFuture}. These methods handle cancellation
46+
* scenarios by cancelling the SQS request if the calling future is cancelled.</p>
47+
*
48+
* <p>This class is intended for internal use and is marked with the {@link SdkInternalApi} annotation.</p>
49+
*/
50+
@SdkInternalApi
51+
public final class QueueAttributesManager {
52+
53+
private static final List<QueueAttributeName> QUEUE_ATTRIBUTE_NAMES =
54+
Arrays.asList(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS,
55+
QueueAttributeName.VISIBILITY_TIMEOUT);
56+
private final SqsAsyncClient sqsClient;
57+
private final String queueUrl;
58+
private final AtomicReference<CompletableFuture<Map<QueueAttributeName, String>>> queueAttributeMap = new AtomicReference<>();
59+
60+
public QueueAttributesManager(SqsAsyncClient sqsClient, String queueUrl) {
61+
this.sqsClient = sqsClient;
62+
this.queueUrl = queueUrl;
63+
}
64+
65+
/**
66+
* Retrieves the received message timeout based on the provided request and queue attributes.
67+
*
68+
* @param rq The receive message request
69+
* @param configuredWaitTime The configured minimum wait time
70+
* @return CompletableFuture with the calculated receive message timeout in milliseconds
71+
*/
72+
public CompletableFuture<Duration> getReceiveMessageTimeout(ReceiveMessageRequest rq, Duration configuredWaitTime) {
73+
Integer waitTimeSeconds = rq.waitTimeSeconds();
74+
if (waitTimeSeconds != null) {
75+
long waitTimeMillis = TimeUnit.SECONDS.toMillis(waitTimeSeconds);
76+
return CompletableFuture.completedFuture(Duration.ofMillis(Math.max(configuredWaitTime.toMillis(), waitTimeMillis)));
77+
}
78+
79+
CompletableFuture<Map<QueueAttributeName, String>> attributeFuture = getAttributeMap();
80+
CompletableFuture<Duration> resultFuture = attributeFuture.thenApply(attributes -> {
81+
String waitTimeSecondsStr = attributes.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS);
82+
long waitTimeFromSqsMillis = TimeUnit.SECONDS.toMillis(Long.parseLong(waitTimeSecondsStr));
83+
return Duration.ofMillis(Math.max(configuredWaitTime.toMillis(), waitTimeFromSqsMillis));
84+
});
85+
86+
return CompletableFutureUtils.forwardExceptionTo(resultFuture, attributeFuture);
87+
}
88+
89+
/**
90+
* Retrieves the visibility timeout for the queue.
91+
*
92+
* @return CompletableFuture with the visibility timeout in nanoseconds
93+
*/
94+
public CompletableFuture<Duration> getVisibilityTimeout() {
95+
CompletableFuture<Map<QueueAttributeName, String>> attributeFuture = getAttributeMap();
96+
CompletableFuture<Duration> resultFuture = attributeFuture.thenApply(attributes -> {
97+
String visibilityTimeoutStr = attributes.get(QueueAttributeName.VISIBILITY_TIMEOUT);
98+
return Duration.ofSeconds(Integer.parseInt(visibilityTimeoutStr));
99+
});
100+
101+
return CompletableFutureUtils.forwardExceptionTo(resultFuture, attributeFuture);
102+
}
103+
104+
/**
105+
* Retrieves the queue attributes based on the predefined attribute names.
106+
*
107+
* @return CompletableFuture with the map of attribute names and their values.
108+
*/
109+
private CompletableFuture<Map<QueueAttributeName, String>> getAttributeMap() {
110+
CompletableFuture<Map<QueueAttributeName, String>> future = queueAttributeMap.get();
111+
112+
if (future == null || future.isCompletedExceptionally()) {
113+
CompletableFuture<Map<QueueAttributeName, String>> newFuture = new CompletableFuture<>();
114+
115+
if (queueAttributeMap.compareAndSet(future, newFuture)) {
116+
fetchQueueAttributes().whenComplete((r, t) -> {
117+
if (t != null) {
118+
newFuture.completeExceptionally(t);
119+
} else {
120+
newFuture.complete(r);
121+
}
122+
});
123+
return newFuture;
124+
} else {
125+
newFuture.cancel(true);
126+
return queueAttributeMap.get();
127+
}
128+
}
129+
return future;
130+
}
131+
132+
/**
133+
* Fetches the queue attributes from SQS and completes the provided future with the result.
134+
*
135+
* @return CompletableFuture with the map of attribute names and values.
136+
*/
137+
private CompletableFuture<Map<QueueAttributeName, String>> fetchQueueAttributes() {
138+
GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
139+
.queueUrl(queueUrl)
140+
.attributeNames(QUEUE_ATTRIBUTE_NAMES)
141+
.build();
142+
143+
CompletableFuture<Map<QueueAttributeName, String>> future =
144+
sqsClient.getQueueAttributes(request)
145+
.thenApply(response -> {
146+
Map<QueueAttributeName, String> attributes = response.attributes();
147+
Validate.notNull(attributes.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS),
148+
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS
149+
+ " attribute is null in SQS.");
150+
Validate.notNull(attributes.get(QueueAttributeName.VISIBILITY_TIMEOUT),
151+
QueueAttributeName.VISIBILITY_TIMEOUT + " attribute is null in SQS.");
152+
return attributes.entrySet().stream()
153+
.filter(entry -> QUEUE_ATTRIBUTE_NAMES.contains(entry.getKey()))
154+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
155+
});
156+
157+
return future;
158+
}
159+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.sqs.internal.batchmanager;
17+
18+
import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_SUPPORTED_SQS_RECEIVE_MSG;
19+
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.ScheduledExecutorService;
22+
import java.util.concurrent.TimeUnit;
23+
import java.util.function.Function;
24+
import software.amazon.awssdk.annotations.SdkInternalApi;
25+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
26+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
27+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
28+
import software.amazon.awssdk.utils.SdkAutoCloseable;
29+
30+
@SdkInternalApi
31+
public class ReceiveBatchManager implements SdkAutoCloseable {
32+
33+
private final SqsAsyncClient sqsClient;
34+
private final ScheduledExecutorService executor;
35+
private final ResponseBatchConfiguration config;
36+
private final String queueUrl;
37+
private final ReceiveQueueBuffer receiveQueueBuffer;
38+
private final QueueAttributesManager queueAttributesManager;
39+
40+
public ReceiveBatchManager(SqsAsyncClient sqsClient, ScheduledExecutorService executor, ResponseBatchConfiguration config,
41+
String queueUrl) {
42+
this.sqsClient = sqsClient;
43+
this.executor = executor;
44+
this.config = config;
45+
this.queueUrl = queueUrl;
46+
this.queueAttributesManager = new QueueAttributesManager(sqsClient, queueUrl);
47+
this.receiveQueueBuffer = ReceiveQueueBuffer.builder()
48+
.executor(executor)
49+
.sqsClient(sqsClient)
50+
.config(config)
51+
.queueUrl(queueUrl)
52+
.queueAttributesManager(queueAttributesManager).build();
53+
}
54+
55+
public CompletableFuture<ReceiveMessageResponse> processRequest(ReceiveMessageRequest rq) {
56+
if (receiveQueueBuffer.isShutDown()) {
57+
throw new IllegalStateException("The client has been shut down.");
58+
}
59+
int numMessages = rq.maxNumberOfMessages() != null ? rq.maxNumberOfMessages() : MAX_SUPPORTED_SQS_RECEIVE_MSG;
60+
61+
return queueAttributesManager.getReceiveMessageTimeout(rq, config.minReceiveWaitTime()).thenCompose(waitTimeMs -> {
62+
CompletableFuture<ReceiveMessageResponse> receiveMessageFuture = new CompletableFuture<>();
63+
receiveQueueBuffer.receiveMessage(receiveMessageFuture, numMessages);
64+
CompletableFuture<ReceiveMessageResponse> timeoutFuture = new CompletableFuture<>();
65+
executor.schedule(() -> timeoutFuture.complete(ReceiveMessageResponse.builder().build()), waitTimeMs.toMillis(),
66+
TimeUnit.MILLISECONDS);
67+
return receiveMessageFuture.applyToEither(timeoutFuture, Function.identity());
68+
69+
});
70+
}
71+
72+
@Override
73+
public void close() {
74+
receiveQueueBuffer.close();
75+
}
76+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.sqs.internal.batchmanager;
17+
18+
import java.util.Map;
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.ConcurrentHashMap;
21+
import java.util.concurrent.ScheduledExecutorService;
22+
import software.amazon.awssdk.annotations.SdkInternalApi;
23+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
24+
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
25+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
26+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
27+
import software.amazon.awssdk.utils.Logger;
28+
import software.amazon.awssdk.utils.SdkAutoCloseable;
29+
30+
@SdkInternalApi
31+
public class ReceiveMessageBatchManager implements SdkAutoCloseable {
32+
33+
private static final Logger log = Logger.loggerFor(ReceiveMessageBatchManager.class);
34+
35+
private final SqsAsyncClient sqsClient;
36+
private final ScheduledExecutorService executor;
37+
private final ResponseBatchConfiguration config;
38+
private final Map<String, ReceiveBatchManager> receiveBatchManagerMap = new ConcurrentHashMap<>();
39+
40+
public ReceiveMessageBatchManager(SqsAsyncClient sqsClient,
41+
ScheduledExecutorService executor,
42+
BatchOverrideConfiguration config) {
43+
this.sqsClient = sqsClient;
44+
this.executor = executor;
45+
this.config = new ResponseBatchConfiguration(config);
46+
}
47+
48+
public CompletableFuture<ReceiveMessageResponse> batchRequest(ReceiveMessageRequest request) {
49+
if (canBeRetrievedFromQueueBuffer(request)) {
50+
return receiveBatchManagerMap.computeIfAbsent(generateBatchKey(request), key -> createReceiveBatchManager(request))
51+
.processRequest(request);
52+
} else {
53+
log.debug(() -> "canBeRetrievedFromQueueBuffer failed, so skipping batching for request for Queue with URL: "
54+
+ request.queueUrl());
55+
return sqsClient.receiveMessage(request);
56+
}
57+
}
58+
59+
/**
60+
* Generates a unique key for batch processing based on the queue URL and any override configuration.
61+
*
62+
* @param request The receive message request.
63+
* @return The generated batch key.
64+
*/
65+
private String generateBatchKey(ReceiveMessageRequest request) {
66+
return request.overrideConfiguration()
67+
.map(config -> request.queueUrl() + config.hashCode())
68+
.orElse(request.queueUrl());
69+
}
70+
71+
private ReceiveBatchManager createReceiveBatchManager(ReceiveMessageRequest request) {
72+
return new ReceiveBatchManager(sqsClient, executor, config, request.queueUrl());
73+
}
74+
75+
@Override
76+
public void close() {
77+
receiveBatchManagerMap.values().forEach(ReceiveBatchManager::close);
78+
}
79+
80+
private boolean canBeRetrievedFromQueueBuffer(ReceiveMessageRequest rq) {
81+
return hasCompatibleAttributes(rq) && isBufferingEnabled() && rq.visibilityTimeout() == null;
82+
}
83+
84+
85+
private boolean hasCompatibleAttributes(ReceiveMessageRequest rq) {
86+
return !rq.hasAttributeNames()
87+
&& hasCompatibleSystemAttributes(rq)
88+
&& hasCompatibleMessageAttributes(rq);
89+
}
90+
91+
private boolean hasCompatibleSystemAttributes(ReceiveMessageRequest rq) {
92+
return !rq.hasMessageSystemAttributeNames()
93+
|| config.messageSystemAttributeNames().equals(rq.messageSystemAttributeNames());
94+
}
95+
96+
private boolean hasCompatibleMessageAttributes(ReceiveMessageRequest rq) {
97+
return !rq.hasMessageAttributeNames()
98+
|| config.receiveMessageAttributeNames().equals(rq.messageAttributeNames());
99+
}
100+
101+
private boolean isBufferingEnabled() {
102+
return config.maxInflightReceiveBatches() > 0 && config.maxDoneReceiveBatches() > 0;
103+
}
104+
105+
}

0 commit comments

Comments
 (0)