Skip to content

Commit 79b93ec

Browse files
committed
Recieve Batch Manager Implementation
1 parent ed57320 commit 79b93ec

File tree

8 files changed

+296
-161
lines changed

8 files changed

+296
-161
lines changed

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/AsyncReceiveMessageBatch.java

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
import java.util.ArrayList;
2222
import java.util.List;
2323
import java.util.Optional;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.ScheduledExecutorService;
26+
import java.util.concurrent.atomic.AtomicBoolean;
2427
import java.util.stream.Collectors;
2528
import java.util.stream.IntStream;
29+
import software.amazon.awssdk.annotations.SdkInternalApi;
2630
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2731
import software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration;
28-
29-
import java.util.concurrent.CompletableFuture;
30-
import java.util.concurrent.ScheduledExecutorService;
31-
import java.util.concurrent.atomic.AtomicBoolean;
3232
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
3333
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
3434
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
@@ -39,12 +39,14 @@
3939
/**
4040
* The {@code AsyncReceiveMessageBatch} class forms a {@link ReceiveMessageRequest} request based on configuration settings,
4141
* collects messages from an AWS SQS queue, and handles exceptions during the process.
42-
*
42+
* <p>
4343
* It manages message visibility timeout by tracking the visibility deadline and expiring messages if not processed in time,
4444
* ensuring unhandled messages return to the queue for reprocessing.
45-
*
45+
* <p>
4646
* Additionally, the class supports clearing messages in the batch and changing their visibility as needed.
4747
*/
48+
49+
@SdkInternalApi
4850
public class AsyncReceiveMessageBatch {
4951

5052
private static final Logger log = Logger.loggerFor(AsyncReceiveMessageBatch.class);
@@ -69,29 +71,30 @@ public AsyncReceiveMessageBatch(ScheduledExecutorService scheduledExecutorServic
6971
}
7072

7173
public CompletableFuture<AsyncReceiveMessageBatch> asyncReceiveMessage() {
72-
ReceiveMessageRequest.Builder request = ReceiveMessageRequest.builder()
73-
.queueUrl(queueUrl)
74-
.maxNumberOfMessages(config.maxBatchItems())
75-
.messageAttributeNames(config.receiveMessageAttributeNames())
76-
.messageAttributeNames(config.receiveMessageAttributeNames());
74+
ReceiveMessageRequest.Builder request =
75+
ReceiveMessageRequest.builder()
76+
.queueUrl(queueUrl)
77+
.maxNumberOfMessages(config.maxBatchItems())
78+
.messageAttributeNames(config.receiveMessageAttributeNames())
79+
.messageAttributeNames(config.receiveMessageAttributeNames());
7780

7881
request.visibilityTimeout((int) this.visibilityTimeout.get(ChronoUnit.SECONDS));
7982

8083
if (config.longPoll()) {
8184
request.waitTimeSeconds(config.longPollWaitTimeoutSeconds());
8285
}
83-
try{
86+
try {
8487
return asyncClient.receiveMessage(request.build())
8588
.handle((response, throwable) -> {
86-
if (throwable != null) {
87-
setException(throwable);
88-
} else {
89-
messages = new ArrayList<>(response.messages());
90-
}
91-
open.set(true);
92-
return this;
93-
});
94-
}finally {
89+
if (throwable != null) {
90+
setException(throwable);
91+
} else {
92+
messages = new ArrayList<>(response.messages());
93+
}
94+
open.set(true);
95+
return this;
96+
});
97+
} finally {
9598
visibilityDeadlineNano = System.nanoTime() + visibilityTimeout.toNanos();
9699
}
97100

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageCompletableFuture.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,15 @@
1515

1616
package software.amazon.awssdk.services.sqs.batchmanager;
1717

18-
1918
import java.time.Duration;
20-
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
21-
2219
import java.util.concurrent.CompletableFuture;
2320
import java.util.concurrent.ScheduledExecutorService;
2421
import java.util.concurrent.ScheduledFuture;
2522
import java.util.concurrent.TimeUnit;
23+
import software.amazon.awssdk.annotations.SdkInternalApi;
24+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
2625

26+
@SdkInternalApi
2727
public class ReceiveMessageCompletableFuture {
2828

2929
private final long waitTimeDeadlineNano;
@@ -53,9 +53,7 @@ public void startWaitTimer(ScheduledExecutorService executorService) {
5353
}
5454
}
5555

56-
5756
public boolean isExpired() {
58-
System.out.println("System.nanoTime() > waitTimeDeadlineNano");
5957
return System.nanoTime() > waitTimeDeadlineNano;
6058
}
6159

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveQueueBuffer.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,23 @@
1515

1616
package software.amazon.awssdk.services.sqs.batchmanager;
1717

18+
import java.util.LinkedList;
1819
import java.util.List;
1920
import java.util.Queue;
2021
import java.util.concurrent.CancellationException;
2122
import java.util.concurrent.ConcurrentLinkedQueue;
23+
import java.util.concurrent.ScheduledExecutorService;
2224
import java.util.concurrent.atomic.AtomicBoolean;
2325
import java.util.concurrent.atomic.AtomicInteger;
2426
import java.util.concurrent.locks.ReentrantLock;
27+
import software.amazon.awssdk.annotations.SdkInternalApi;
2528
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2629
import software.amazon.awssdk.services.sqs.internal.batchmanager.QueueAttributesManager;
2730
import software.amazon.awssdk.services.sqs.internal.batchmanager.ResponseBatchConfiguration;
2831
import software.amazon.awssdk.services.sqs.model.Message;
2932
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
3033

31-
import java.util.LinkedList;
32-
import java.util.concurrent.ScheduledExecutorService;
33-
34+
@SdkInternalApi
3435
public class ReceiveQueueBuffer {
3536

3637
private final ScheduledExecutorService executor;
@@ -66,12 +67,12 @@ public boolean isShutDown() {
6667
return shutDown.get();
6768
}
6869

69-
public void shutdown( ) {
70+
public void shutdown() {
7071
if (this.shutDown.compareAndSet(false, true)) {
7172
// Clear all finished tasks
7273
while (!finishedTasks.isEmpty()) {
7374
AsyncReceiveMessageBatch batch = finishedTasks.poll();
74-
if(inflightReceiveMessageBatches.get() > 0) {
75+
if (inflightReceiveMessageBatches.get() > 0) {
7576
inflightReceiveMessageBatches.decrementAndGet();
7677
}
7778
if (batch != null) {
@@ -162,8 +163,7 @@ private void fulfillFuture(ReceiveMessageCompletableFuture future, AsyncReceiveM
162163
lock.unlock();
163164
}
164165
}
165-
166-
future.setSuccess(ReceiveMessageResponse.builder().messages(messages).build());
166+
future.setSuccess(ReceiveMessageResponse.builder().messages(messages).build());
167167
}
168168

169169
private void satisfyFuturesFromBuffer() {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/QueueAttributesManager.java

Lines changed: 88 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -16,139 +16,142 @@
1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

1818
import java.time.Duration;
19+
import java.util.Arrays;
20+
import java.util.List;
21+
import java.util.Map;
1922
import java.util.concurrent.CompletableFuture;
2023
import java.util.concurrent.TimeUnit;
2124
import java.util.concurrent.atomic.AtomicReference;
25+
import java.util.stream.Collectors;
2226
import software.amazon.awssdk.annotations.SdkInternalApi;
2327
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2428
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
2529
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
2630
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
2731
import software.amazon.awssdk.utils.Validate;
2832

29-
3033
@SdkInternalApi
31-
public class QueueAttributesManager {
34+
public final class QueueAttributesManager {
3235

36+
private static final List<QueueAttributeName> QUEUE_ATTRIBUTE_NAMES =
37+
Arrays.asList(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS,
38+
QueueAttributeName.VISIBILITY_TIMEOUT);
3339
private final SqsAsyncClient sqsClient;
3440
private final String queueUrl;
35-
private final AtomicReference<CompletableFuture<Duration>> defaultWaitTimeSecondsFuture = new AtomicReference<>();
36-
private final AtomicReference<CompletableFuture<Duration>> visibilityTimeoutSecondsFuture = new AtomicReference<>();
37-
private final Duration minReceiveWaitTime;
41+
private final AtomicReference<CompletableFuture<Map<QueueAttributeName, String>>> queueAttributeMap = new AtomicReference<>();
3842

39-
public QueueAttributesManager(SqsAsyncClient sqsClient, String queueUrl, Duration minReceiveWaitTime) {
43+
public QueueAttributesManager(SqsAsyncClient sqsClient, String queueUrl) {
4044
this.sqsClient = sqsClient;
4145
this.queueUrl = queueUrl;
42-
this.minReceiveWaitTime = minReceiveWaitTime;
4346
}
4447

4548
/**
4649
* Retrieves the received message timeout based on the provided request and queue attributes.
4750
*
48-
* @param rq The receive message request
51+
* @param rq The receive message request
52+
* @param configuredWaitTime The configured minimum wait time
4953
* @return CompletableFuture with the calculated receive message timeout in milliseconds
5054
*/
51-
public CompletableFuture<Duration> getReceiveMessageTimeout(ReceiveMessageRequest rq) {
52-
CompletableFuture<Duration> waitTimeFuture = defaultWaitTimeSecondsFuture.get();
55+
public CompletableFuture<Duration> getReceiveMessageTimeout(ReceiveMessageRequest rq, Duration configuredWaitTime) {
56+
Integer waitTimeSeconds = rq.waitTimeSeconds();
57+
if (waitTimeSeconds != null) {
58+
long waitTimeMillis = TimeUnit.SECONDS.toMillis(waitTimeSeconds);
59+
return CompletableFuture.completedFuture(Duration.ofMillis(Math.max(configuredWaitTime.toMillis(), waitTimeMillis)));
60+
}
61+
62+
CompletableFuture<Map<QueueAttributeName, String>> attributeFuture = getAttributeMap();
63+
CompletableFuture<Duration> resultFuture = attributeFuture.thenApply(attributes -> {
64+
String waitTimeSecondsStr = attributes.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS);
65+
long waitTimeFromSqsMillis = TimeUnit.SECONDS.toMillis(Long.parseLong(waitTimeSecondsStr));
66+
return Duration.ofMillis(Math.max(configuredWaitTime.toMillis(), waitTimeFromSqsMillis));
67+
});
5368

54-
if (waitTimeFuture == null) {
55-
CompletableFuture<Duration> newWaitTimeFuture = new CompletableFuture<>();
56-
if (defaultWaitTimeSecondsFuture.compareAndSet(null, newWaitTimeFuture)) {
57-
fetchQueueWaitTime(newWaitTimeFuture);
69+
resultFuture.whenComplete((r, t) -> {
70+
if (resultFuture.isCancelled()) {
71+
attributeFuture.cancel(true);
5872
}
59-
waitTimeFuture = defaultWaitTimeSecondsFuture.get();
60-
}
73+
});
6174

62-
return waitTimeFuture.thenApply(waitTime -> calculateWaitTime(rq, waitTime));
75+
return resultFuture;
6376
}
6477

6578
/**
66-
* Fetches the queue wait time from SQS and completes the provided future with the result.
79+
* Retrieves the visibility timeout for the queue.
6780
*
68-
* @param newWaitTimeFuture The future to complete with the fetched wait time
81+
* @return CompletableFuture with the visibility timeout in nanoseconds
6982
*/
70-
private void fetchQueueWaitTime(CompletableFuture<Duration> newWaitTimeFuture) {
71-
GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
72-
.queueUrl(queueUrl)
73-
.attributeNames(
74-
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS)
75-
.build();
76-
sqsClient.getQueueAttributes(request)
77-
.thenApply(response -> {
78-
String messageWaitTime =
79-
Validate.notNull(response
80-
.attributes()
81-
.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS),
82-
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS +
83-
" attribute is null in sqs.");
84-
85-
return Duration.ofSeconds(Integer.parseInt(messageWaitTime));
86-
})
87-
.thenAccept(newWaitTimeFuture::complete)
88-
.exceptionally(ex -> {
89-
newWaitTimeFuture.completeExceptionally(ex);
90-
return null;
91-
});
92-
}
83+
public CompletableFuture<Duration> getVisibilityTimeout() {
84+
CompletableFuture<Map<QueueAttributeName, String>> attributeFuture = getAttributeMap();
85+
CompletableFuture<Duration> resultFuture = attributeFuture.thenApply(attributes -> {
86+
String visibilityTimeoutStr = attributes.get(QueueAttributeName.VISIBILITY_TIMEOUT);
87+
return Duration.ofSeconds(Integer.parseInt(visibilityTimeoutStr));
88+
});
89+
90+
resultFuture.whenComplete((r, t) -> {
91+
if (resultFuture.isCancelled()) {
92+
attributeFuture.cancel(true);
93+
}
94+
});
9395

94-
/**
95-
* Calculates the wait time for receiving a message, ensuring it meets the minimum wait time.
96-
*
97-
* @param rq The receive message request
98-
* @param defaultWaitTime The default wait time from the queue attributes
99-
* @return The calculated wait time in milliseconds
100-
*/
101-
private Duration calculateWaitTime(ReceiveMessageRequest rq, Duration defaultWaitTime) {
96+
return resultFuture;
10297

103-
int waitTimeSeconds = (rq.waitTimeSeconds() != null) ? rq.waitTimeSeconds() : (int) defaultWaitTime.getSeconds();
104-
return Duration.ofMillis(Math.max(minReceiveWaitTime.toMillis(),
105-
TimeUnit.MILLISECONDS.convert(waitTimeSeconds, TimeUnit.SECONDS)));
10698
}
10799

108100
/**
109-
* Retrieves the visibility timeout for the queue.
101+
* Retrieves the queue attributes based on the predefined attribute names.
110102
*
111-
* @return CompletableFuture with the visibility timeout in nanoseconds
103+
* @return CompletableFuture with the map of attribute names and their values.
112104
*/
113-
public CompletableFuture<Duration> getVisibilityTimeout() {
114-
CompletableFuture<Duration> timeoutFuture = visibilityTimeoutSecondsFuture.get();
115-
116-
if (timeoutFuture == null) {
117-
CompletableFuture<Duration> newTimeoutFuture = new CompletableFuture<>();
118-
if (visibilityTimeoutSecondsFuture.compareAndSet(null, newTimeoutFuture)) {
119-
fetchVisibilityTimeout(newTimeoutFuture);
105+
private CompletableFuture<Map<QueueAttributeName, String>> getAttributeMap() {
106+
CompletableFuture<Map<QueueAttributeName, String>> future = queueAttributeMap.get();
107+
108+
if (future == null || future.isCompletedExceptionally()) {
109+
CompletableFuture<Map<QueueAttributeName, String>> newFuture = fetchQueueAttributes();
110+
if (queueAttributeMap.compareAndSet(future, newFuture)) {
111+
newFuture.whenComplete((r, t) -> {
112+
if (t != null) {
113+
queueAttributeMap.compareAndSet(newFuture, null);
114+
}
115+
});
116+
return newFuture;
120117
}
121-
timeoutFuture = visibilityTimeoutSecondsFuture.get();
118+
return queueAttributeMap.get();
122119
}
123120

124-
return timeoutFuture;
121+
return future;
125122
}
126123

124+
127125
/**
128-
* Fetches the visibility timeout from SQS and completes the provided future with the result.
126+
* Fetches the queue attributes from SQS and completes the provided future with the result.
129127
*
130-
* @param newTimeoutFuture The future to complete with the fetched visibility timeout
128+
* @return CompletableFuture with the map of attribute names and values.
131129
*/
132-
private void fetchVisibilityTimeout(CompletableFuture<Duration> newTimeoutFuture) {
130+
private CompletableFuture<Map<QueueAttributeName, String>> fetchQueueAttributes() {
133131
GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
134132
.queueUrl(queueUrl)
135-
.attributeNames(QueueAttributeName.VISIBILITY_TIMEOUT)
133+
.attributeNames(QUEUE_ATTRIBUTE_NAMES)
136134
.build();
137-
sqsClient.getQueueAttributes(request)
138-
.thenApply(response -> {
139-
String visibilityTimeout =
140-
Validate.notNull(response
141-
.attributes()
142-
.get(QueueAttributeName.VISIBILITY_TIMEOUT),
143-
QueueAttributeName.VISIBILITY_TIMEOUT +
144-
" attribute is null in sqs.");
145-
146-
return Duration.ofSeconds(Integer.parseInt(visibilityTimeout));
147-
})
148-
.thenAccept(newTimeoutFuture::complete)
149-
.exceptionally(ex -> {
150-
newTimeoutFuture.completeExceptionally(ex);
151-
return null;
152-
});
135+
136+
CompletableFuture<Map<QueueAttributeName, String>> future =
137+
sqsClient.getQueueAttributes(request)
138+
.thenApply(response -> {
139+
Map<QueueAttributeName, String> attributes = response.attributes();
140+
Validate.notNull(attributes.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS),
141+
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS
142+
+ " attribute is null in SQS.");
143+
Validate.notNull(attributes.get(QueueAttributeName.VISIBILITY_TIMEOUT),
144+
QueueAttributeName.VISIBILITY_TIMEOUT + " attribute is null in SQS.");
145+
return attributes.entrySet().stream()
146+
.filter(entry -> QUEUE_ATTRIBUTE_NAMES.contains(entry.getKey()))
147+
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
148+
});
149+
150+
future.whenComplete((r, t) -> {
151+
if (t != null) {
152+
queueAttributeMap.set(null); // Reset the future on failure
153+
}
154+
});
155+
return future;
153156
}
154-
}
157+
}

0 commit comments

Comments
 (0)