-
Notifications
You must be signed in to change notification settings - Fork 914
Receive Batch Manager Implementation #5488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
joviegas
merged 18 commits into
feature/master/sqs-batch-manager
from
joviegas/receive_buffer_queue
Aug 27, 2024
Merged
Changes from all commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
b8734a2
Add Recieve Buffer Queue And its related configuration
joviegas ca6cf94
Merge branch 'feature/master/sqs-batch-manager' into joviegas/receive…
joviegas ed57320
Update ReceiveBatch manager
joviegas 79b93ec
Recieve Batch Manager Implementation
joviegas 6ff4a77
Receive Batch Manager Implemetation
joviegas 6e47217
Handled review comments
joviegas 766fbef
Checkstyle failure
joviegas 8a0fb75
Flsky test case fixed
joviegas 18d7af2
Flaky test case fixed
joviegas ef0d98c
Hamdled review comments
joviegas 61db2c6
Handled comments
joviegas 239c4f1
Merge branch 'feature/master/sqs-batch-manager' into joviegas/receive…
joviegas f510b82
Removed ReceiveMessageCompletableFuture
joviegas 1a0b987
SdkClosable implemented
joviegas e0f7b34
Added ReceiveMessageBatchManager class for completeness
joviegas 87eef9f
Checkstyle issues
joviegas ac17c3d
Null checks
joviegas f6031c7
Handled comments from Zoe
joviegas File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
319 changes: 287 additions & 32 deletions
319
...ain/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
159 changes: 159 additions & 0 deletions
159
...ava/software/amazon/awssdk/services/sqs/internal/batchmanager/QueueAttributesManager.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,159 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.awssdk.services.sqs.internal.batchmanager; | ||
|
||
import java.time.Duration; | ||
import java.util.Arrays; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.atomic.AtomicReference; | ||
import java.util.stream.Collectors; | ||
import software.amazon.awssdk.annotations.SdkInternalApi; | ||
import software.amazon.awssdk.services.sqs.SqsAsyncClient; | ||
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; | ||
import software.amazon.awssdk.services.sqs.model.QueueAttributeName; | ||
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; | ||
import software.amazon.awssdk.utils.CompletableFutureUtils; | ||
import software.amazon.awssdk.utils.Validate; | ||
|
||
|
||
/** | ||
* The {@code QueueAttributesManager} class is responsible for managing and retrieving specific attributes | ||
* of an AWS SQS queue, such as message wait time and visibility timeout. It efficiently caches these attributes | ||
* to minimize redundant API calls to SQS, ensuring that the attributes are fetched only once and reused in subsequent requests. | ||
* | ||
* <p>This class uses an {@link AtomicReference} to maintain the state of the attribute map, allowing concurrent access | ||
* and handling cases where the fetching of attributes may fail. If an error occurs during the retrieval of attributes, | ||
* the state is reset to allow for a fresh attempt in subsequent calls.</p> | ||
* | ||
* <p>The class provides methods to get the visibility timeout and calculate the message receive timeout, which | ||
* are asynchronously retrieved and processed using {@link CompletableFuture}. These methods handle cancellation | ||
* scenarios by cancelling the SQS request if the calling future is cancelled.</p> | ||
* | ||
* <p>This class is intended for internal use and is marked with the {@link SdkInternalApi} annotation.</p> | ||
*/ | ||
@SdkInternalApi | ||
public final class QueueAttributesManager { | ||
|
||
private static final List<QueueAttributeName> QUEUE_ATTRIBUTE_NAMES = | ||
Arrays.asList(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, | ||
QueueAttributeName.VISIBILITY_TIMEOUT); | ||
private final SqsAsyncClient sqsClient; | ||
private final String queueUrl; | ||
private final AtomicReference<CompletableFuture<Map<QueueAttributeName, String>>> queueAttributeMap = new AtomicReference<>(); | ||
|
||
public QueueAttributesManager(SqsAsyncClient sqsClient, String queueUrl) { | ||
this.sqsClient = sqsClient; | ||
this.queueUrl = queueUrl; | ||
} | ||
|
||
/** | ||
* Retrieves the received message timeout based on the provided request and queue attributes. | ||
* | ||
* @param rq The receive message request | ||
* @param configuredWaitTime The configured minimum wait time | ||
* @return CompletableFuture with the calculated receive message timeout in milliseconds | ||
*/ | ||
public CompletableFuture<Duration> getReceiveMessageTimeout(ReceiveMessageRequest rq, Duration configuredWaitTime) { | ||
Integer waitTimeSeconds = rq.waitTimeSeconds(); | ||
if (waitTimeSeconds != null) { | ||
long waitTimeMillis = TimeUnit.SECONDS.toMillis(waitTimeSeconds); | ||
return CompletableFuture.completedFuture(Duration.ofMillis(Math.max(configuredWaitTime.toMillis(), waitTimeMillis))); | ||
} | ||
|
||
CompletableFuture<Map<QueueAttributeName, String>> attributeFuture = getAttributeMap(); | ||
CompletableFuture<Duration> resultFuture = attributeFuture.thenApply(attributes -> { | ||
String waitTimeSecondsStr = attributes.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS); | ||
long waitTimeFromSqsMillis = TimeUnit.SECONDS.toMillis(Long.parseLong(waitTimeSecondsStr)); | ||
return Duration.ofMillis(Math.max(configuredWaitTime.toMillis(), waitTimeFromSqsMillis)); | ||
}); | ||
|
||
return CompletableFutureUtils.forwardExceptionTo(resultFuture, attributeFuture); | ||
} | ||
|
||
/** | ||
* Retrieves the visibility timeout for the queue. | ||
* | ||
* @return CompletableFuture with the visibility timeout in nanoseconds | ||
*/ | ||
public CompletableFuture<Duration> getVisibilityTimeout() { | ||
CompletableFuture<Map<QueueAttributeName, String>> attributeFuture = getAttributeMap(); | ||
CompletableFuture<Duration> resultFuture = attributeFuture.thenApply(attributes -> { | ||
String visibilityTimeoutStr = attributes.get(QueueAttributeName.VISIBILITY_TIMEOUT); | ||
return Duration.ofSeconds(Integer.parseInt(visibilityTimeoutStr)); | ||
}); | ||
|
||
return CompletableFutureUtils.forwardExceptionTo(resultFuture, attributeFuture); | ||
} | ||
|
||
/** | ||
* Retrieves the queue attributes based on the predefined attribute names. | ||
* | ||
* @return CompletableFuture with the map of attribute names and their values. | ||
*/ | ||
private CompletableFuture<Map<QueueAttributeName, String>> getAttributeMap() { | ||
CompletableFuture<Map<QueueAttributeName, String>> future = queueAttributeMap.get(); | ||
|
||
if (future == null || future.isCompletedExceptionally()) { | ||
CompletableFuture<Map<QueueAttributeName, String>> newFuture = new CompletableFuture<>(); | ||
|
||
if (queueAttributeMap.compareAndSet(future, newFuture)) { | ||
fetchQueueAttributes().whenComplete((r, t) -> { | ||
if (t != null) { | ||
newFuture.completeExceptionally(t); | ||
} else { | ||
newFuture.complete(r); | ||
} | ||
}); | ||
return newFuture; | ||
} else { | ||
newFuture.cancel(true); | ||
return queueAttributeMap.get(); | ||
} | ||
} | ||
return future; | ||
} | ||
|
||
/** | ||
* Fetches the queue attributes from SQS and completes the provided future with the result. | ||
* | ||
* @return CompletableFuture with the map of attribute names and values. | ||
*/ | ||
private CompletableFuture<Map<QueueAttributeName, String>> fetchQueueAttributes() { | ||
GetQueueAttributesRequest request = GetQueueAttributesRequest.builder() | ||
.queueUrl(queueUrl) | ||
.attributeNames(QUEUE_ATTRIBUTE_NAMES) | ||
.build(); | ||
|
||
CompletableFuture<Map<QueueAttributeName, String>> future = | ||
sqsClient.getQueueAttributes(request) | ||
.thenApply(response -> { | ||
Map<QueueAttributeName, String> attributes = response.attributes(); | ||
Validate.notNull(attributes.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS), | ||
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS | ||
+ " attribute is null in SQS."); | ||
Validate.notNull(attributes.get(QueueAttributeName.VISIBILITY_TIMEOUT), | ||
QueueAttributeName.VISIBILITY_TIMEOUT + " attribute is null in SQS."); | ||
return attributes.entrySet().stream() | ||
.filter(entry -> QUEUE_ATTRIBUTE_NAMES.contains(entry.getKey())) | ||
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); | ||
}); | ||
|
||
return future; | ||
} | ||
} |
76 changes: 76 additions & 0 deletions
76
...n/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.awssdk.services.sqs.internal.batchmanager; | ||
|
||
import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_SUPPORTED_SQS_RECEIVE_MSG; | ||
|
||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Function; | ||
import software.amazon.awssdk.annotations.SdkInternalApi; | ||
import software.amazon.awssdk.services.sqs.SqsAsyncClient; | ||
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; | ||
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; | ||
import software.amazon.awssdk.utils.SdkAutoCloseable; | ||
|
||
@SdkInternalApi | ||
public class ReceiveBatchManager implements SdkAutoCloseable { | ||
|
||
private final SqsAsyncClient sqsClient; | ||
private final ScheduledExecutorService executor; | ||
private final ResponseBatchConfiguration config; | ||
private final String queueUrl; | ||
private final ReceiveQueueBuffer receiveQueueBuffer; | ||
private final QueueAttributesManager queueAttributesManager; | ||
|
||
public ReceiveBatchManager(SqsAsyncClient sqsClient, ScheduledExecutorService executor, ResponseBatchConfiguration config, | ||
String queueUrl) { | ||
this.sqsClient = sqsClient; | ||
this.executor = executor; | ||
this.config = config; | ||
this.queueUrl = queueUrl; | ||
this.queueAttributesManager = new QueueAttributesManager(sqsClient, queueUrl); | ||
this.receiveQueueBuffer = ReceiveQueueBuffer.builder() | ||
.executor(executor) | ||
.sqsClient(sqsClient) | ||
.config(config) | ||
.queueUrl(queueUrl) | ||
.queueAttributesManager(queueAttributesManager).build(); | ||
} | ||
|
||
public CompletableFuture<ReceiveMessageResponse> processRequest(ReceiveMessageRequest rq) { | ||
if (receiveQueueBuffer.isShutDown()) { | ||
throw new IllegalStateException("The client has been shut down."); | ||
} | ||
int numMessages = rq.maxNumberOfMessages() != null ? rq.maxNumberOfMessages() : MAX_SUPPORTED_SQS_RECEIVE_MSG; | ||
|
||
return queueAttributesManager.getReceiveMessageTimeout(rq, config.minReceiveWaitTime()).thenCompose(waitTimeMs -> { | ||
CompletableFuture<ReceiveMessageResponse> receiveMessageFuture = new CompletableFuture<>(); | ||
receiveQueueBuffer.receiveMessage(receiveMessageFuture, numMessages); | ||
CompletableFuture<ReceiveMessageResponse> timeoutFuture = new CompletableFuture<>(); | ||
executor.schedule(() -> timeoutFuture.complete(ReceiveMessageResponse.builder().build()), waitTimeMs.toMillis(), | ||
TimeUnit.MILLISECONDS); | ||
return receiveMessageFuture.applyToEither(timeoutFuture, Function.identity()); | ||
|
||
}); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
receiveQueueBuffer.close(); | ||
} | ||
} |
105 changes: 105 additions & 0 deletions
105
...software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
/* | ||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"). | ||
* You may not use this file except in compliance with the License. | ||
* A copy of the License is located at | ||
* | ||
* http://aws.amazon.com/apache2.0 | ||
* | ||
* or in the "license" file accompanying this file. This file is distributed | ||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either | ||
* express or implied. See the License for the specific language governing | ||
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.awssdk.services.sqs.internal.batchmanager; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.CompletableFuture; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import software.amazon.awssdk.annotations.SdkInternalApi; | ||
import software.amazon.awssdk.services.sqs.SqsAsyncClient; | ||
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration; | ||
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; | ||
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; | ||
import software.amazon.awssdk.utils.Logger; | ||
import software.amazon.awssdk.utils.SdkAutoCloseable; | ||
|
||
@SdkInternalApi | ||
public class ReceiveMessageBatchManager implements SdkAutoCloseable { | ||
|
||
private static final Logger log = Logger.loggerFor(ReceiveMessageBatchManager.class); | ||
|
||
private final SqsAsyncClient sqsClient; | ||
private final ScheduledExecutorService executor; | ||
private final ResponseBatchConfiguration config; | ||
private final Map<String, ReceiveBatchManager> receiveBatchManagerMap = new ConcurrentHashMap<>(); | ||
|
||
public ReceiveMessageBatchManager(SqsAsyncClient sqsClient, | ||
ScheduledExecutorService executor, | ||
BatchOverrideConfiguration config) { | ||
this.sqsClient = sqsClient; | ||
this.executor = executor; | ||
this.config = new ResponseBatchConfiguration(config); | ||
} | ||
|
||
public CompletableFuture<ReceiveMessageResponse> batchRequest(ReceiveMessageRequest request) { | ||
if (canBeRetrievedFromQueueBuffer(request)) { | ||
return receiveBatchManagerMap.computeIfAbsent(generateBatchKey(request), key -> createReceiveBatchManager(request)) | ||
.processRequest(request); | ||
} else { | ||
log.debug(() -> "canBeRetrievedFromQueueBuffer failed, so skipping batching for request for Queue with URL: " | ||
+ request.queueUrl()); | ||
return sqsClient.receiveMessage(request); | ||
} | ||
} | ||
|
||
/** | ||
* Generates a unique key for batch processing based on the queue URL and any override configuration. | ||
* | ||
* @param request The receive message request. | ||
* @return The generated batch key. | ||
*/ | ||
private String generateBatchKey(ReceiveMessageRequest request) { | ||
return request.overrideConfiguration() | ||
.map(config -> request.queueUrl() + config.hashCode()) | ||
.orElse(request.queueUrl()); | ||
} | ||
|
||
private ReceiveBatchManager createReceiveBatchManager(ReceiveMessageRequest request) { | ||
return new ReceiveBatchManager(sqsClient, executor, config, request.queueUrl()); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
receiveBatchManagerMap.values().forEach(ReceiveBatchManager::close); | ||
} | ||
|
||
private boolean canBeRetrievedFromQueueBuffer(ReceiveMessageRequest rq) { | ||
return hasCompatibleAttributes(rq) && isBufferingEnabled() && rq.visibilityTimeout() == null; | ||
} | ||
|
||
|
||
private boolean hasCompatibleAttributes(ReceiveMessageRequest rq) { | ||
return !rq.hasAttributeNames() | ||
&& hasCompatibleSystemAttributes(rq) | ||
&& hasCompatibleMessageAttributes(rq); | ||
} | ||
|
||
private boolean hasCompatibleSystemAttributes(ReceiveMessageRequest rq) { | ||
return !rq.hasMessageSystemAttributeNames() | ||
|| config.messageSystemAttributeNames().equals(rq.messageSystemAttributeNames()); | ||
} | ||
|
||
private boolean hasCompatibleMessageAttributes(ReceiveMessageRequest rq) { | ||
return !rq.hasMessageAttributeNames() | ||
|| config.receiveMessageAttributeNames().equals(rq.messageAttributeNames()); | ||
} | ||
|
||
private boolean isBufferingEnabled() { | ||
return config.maxInflightReceiveBatches() > 0 && config.maxDoneReceiveBatches() > 0; | ||
} | ||
|
||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.