-
Notifications
You must be signed in to change notification settings - Fork 914
Receive Batch Manager Implementation #5488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Receive Batch Manager Implementation #5488
Conversation
...java/software/amazon/awssdk/services/sqs/internal/batchmanager/AsyncReceiveMessageBatch.java
Outdated
Show resolved
Hide resolved
...java/software/amazon/awssdk/services/sqs/internal/batchmanager/AsyncReceiveMessageBatch.java
Outdated
Show resolved
Hide resolved
messages = new ArrayList<>(response.messages()); | ||
} | ||
open.set(true); | ||
return this; | ||
}); | ||
} finally { | ||
visibilityDeadlineNano = System.nanoTime() + visibilityTimeout.toNanos(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if this gets called multiple times? Seems like these can get out of sync; for example one request might succeed so messages
has content, but another one fails so exception
is also non-null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is called per instance of AsyncReceiveMessageBatch. Each thread creates its own instance, so there’s no case where it can go out of sync. Since this internal API is called per instance, I can’t think of a scenario where a single instance is created and accessed by multiple threads. If you see any such case, please let me know
this.config = config; | ||
} | ||
|
||
public CompletableFuture<AsyncReceiveMessageBatch> asyncReceiveMessage() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we support calling this multiple times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Every spawn calls create as new instances of AsyncReceiveMessageBatch
AsyncReceiveMessageBatch asyncReceiveMessageBatch = new AsyncReceiveMessageBatch(
queueUrl, sqsClient, visibilityTimeoutNanos, config);
inflightReceiveMessageBatches.incrementAndGet();
asyncReceiveMessageBatch.asyncReceiveMessage()
.whenComplete((response, exception) -> reportBatchFinished(response));
So two simultaneous calls will create two instances and do two separate reportBatchFinished which is expected.
However AsyncReceiveMessageBatch is handled for Thread safety
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By making
private volatile Queue<Message> messages = new ConcurrentLinkedQueue<>(); // Thread-safe queue
We are now able to run this mutiple times in a thread safe way
|
||
public AsyncReceiveMessageBatch(ScheduledExecutorService scheduledExecutorService, String queueUrl, | ||
SqsAsyncClient asyncClient, Duration visibilityTimeout, ResponseBatchConfiguration config) { | ||
this.scheduledExecutorService = scheduledExecutorService; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
* permissions and limitations under the License. | ||
*/ | ||
|
||
package software.amazon.awssdk.services.sqs.BatchManager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
capitalized package name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final ResponseBatchConfiguration config; | ||
private final AtomicBoolean open = new AtomicBoolean(false); | ||
private volatile Throwable exception; | ||
private List<Message> messages; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the very least, this probably needs to be volatile too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, made it volatile and used CopyOnWriteArrayList
return asyncClient.changeMessageVisibilityBatch(batchRequest); | ||
} | ||
|
||
private void checkIfOpen() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not really sure what the purpose of ensure this is "open" should be. What does "open" indicate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed this , this was redundant check to make sure we donot access the exception or messages while the receieveMessage call has not happened or in progress.
Still looking, giving some early feedback |
...n/java/software/amazon/awssdk/services/sqs/internal/batchmanager/QueueAttributesManager.java
Show resolved
Hide resolved
}); | ||
|
||
resultFuture.whenComplete((r, t) -> { | ||
if (resultFuture.isCancelled()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor: probably better to just check instanceof
on t
. If this future was cancelled, t
will have type CancellationException
.
5a887a7
to
6e47217
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still going through the PR, left some initial comments
...c/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java
Outdated
Show resolved
Hide resolved
...c/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java
Outdated
Show resolved
Hide resolved
...c/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java
Show resolved
Hide resolved
...java/software/amazon/awssdk/services/sqs/internal/batchmanager/AsyncReceiveMessageBatch.java
Outdated
Show resolved
Hide resolved
...java/software/amazon/awssdk/services/sqs/internal/batchmanager/AsyncReceiveMessageBatch.java
Outdated
Show resolved
Hide resolved
.../main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java
Outdated
Show resolved
Hide resolved
...java/software/amazon/awssdk/services/sqs/internal/batchmanager/AsyncReceiveMessageBatch.java
Outdated
Show resolved
Hide resolved
.../main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java
Outdated
Show resolved
Hide resolved
...n/java/software/amazon/awssdk/services/sqs/internal/batchmanager/QueueAttributesManager.java
Outdated
Show resolved
Hide resolved
.../main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, still not finished review
...c/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java
Outdated
Show resolved
Hide resolved
...c/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java
Outdated
Show resolved
Hide resolved
...c/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
/** | ||
* Define the message attributes receive calls will request. Only receive message requests that request the same set of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this is what used in v1, but should we use something like .receiveMessageRequestTransformer
to make it easier for customers to change other things in the request? And we can get rid of the other settings like maxBatchItems?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, Zoe. If that’s the case, we’ll need to change the behavior of the feature. Specifically, when fetching SQS messages, we would fetch all the attributes and then filter them at the request level based on the attributes passed in the request. This change would have two main impacts:
1. Diverging from V1 behavior.
2. Requiring the retrieval of all attributes, even when the user has configured only a specific number of attribute names in the configuration.
It’s important to note that we currently only respect the maxNumberOfMessages and waitTimeSeconds parameters in the request, which are specific to the request. The other parameters are referenced from the configuration settings and are queue-level settings.
Given this, we have the following options:
• We could remove receiveAttributeNames for now and add it later if there is a user request.
• We could keep it the same as V1 and provide a configuration option, as it currently exists.
• If we provide a request-level option like receiveMessageRequestTransformer, we could pull out the entire attributes map from SQS and then filter it while completing the future.
From my observation of the attributeMap, I think the current behavior—where configuration-level attributeNames are used—is intended to optimize the receiveMessage payload when it comes to the map.
Also, in my opinion, if batchManager is something I expect to operate at the batch level, then if users want specific attributes, they can use the normal receiveMessage API. This way, we can avoid complications in logic where we complete the futures and keep it as simple as V1.
Will make sure will discuss in Surface API review meeting
@@ -85,6 +86,29 @@ default CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibili | |||
throw new UnsupportedOperationException(); | |||
} | |||
|
|||
|
|||
default CompletableFuture<SendMessageResponse> sendMessage(Consumer<SendMessageRequest.Builder> sendMessageRequest) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing javadocs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my bad I raised a separate PR for this and accidently pushed some changes
queueAttributeMap.set(null); // Reset on failure | ||
newFuture.completeExceptionally(t); // Complete the future exceptionally | ||
} else { | ||
newFuture.complete(r); // Complete the future with the result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we remove the comments? The code seems to be self explanatory to me and having comments here clutter the code a bit IMO.
future.whenComplete((r, t) -> { | ||
if (t != null) { | ||
queueAttributeMap.set(null); // Reset the future on failure | ||
} | ||
}); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this here? It seems we nullify it on line 119
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch , its not required.
}); | ||
} | ||
|
||
public void shutdown() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we follow v2 style - make it extend SdkAutoCloseable
and implement close
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
...ftware/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageCompletableFuture.java
Outdated
Show resolved
Hide resolved
ReceiveMessageCompletableFuture receiveMessageFuture = | ||
new ReceiveMessageCompletableFuture(numMessages, waitTimeMs); | ||
receiveQueueBuffer.receiveMessage(receiveMessageFuture); | ||
receiveMessageFuture.startWaitTimer(executor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of writing timer logic, can we use CompletableFuture#applyToEither
or CompletebleFuture#anyOf
instead where one is waiting for the response, the other is completing the future with empty response after the timeout.
We may be able to get rid of ReceiveMessageCompletableFuture
class if the purpose of it is to encapsulate timer logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
93d2b7e
to
1a0b987
Compare
} | ||
|
||
private void satisfyFuturesFromBuffer() { | ||
pruneExpiredTasks(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we can check expired tasks where we are processing future, i.e., line 170?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
/** | ||
* @return the optional maximum amount of time that an outgoing call waits to be batched with messages of the same type. | ||
*/ | ||
public Duration maxBatchOpenDuration() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to change this as part of this PR; I'm just writing down alternatives batchOpenTimeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will change this in next PR where I will handle Batch Size related comments from Surface area review
* @return the message attributes receive calls will request. | ||
*/ | ||
public List<String> receiveMessageAttributeNames() { | ||
return receiveMessageAttributeNames == null ? Collections.emptyList() : |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collections.unmodifiableList(Collections.emptyList())
? Can we do the wrap in the ctor so that we don't need to wrap every time getter is invoked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Handled this is builders() now
if (receiveQueueBuffer.isShutDown()) { | ||
throw new IllegalStateException("The client has been shut down."); | ||
} | ||
int numMessages = Optional.ofNullable(rq.maxNumberOfMessages()).orElse(10); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we define all magic numbers in a single class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added SQSMessageDefaults
} | ||
|
||
public CompletableFuture<ReceiveMessageResponse> batchRequest(ReceiveMessageRequest request) { | ||
return canBeRetrievedFromQueueBuffer(request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add some debug logs to help troubleshoot in the future, i.e., when it decides to retrieve from the buffer or send it through the SDK client? Example: https://github.com/aws/aws-sdk-java-v2/blob/master/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java#L75
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its safe to add only in canBeRetrievedFromQueueBuffer false case , else we will get lot of logs , added them now
} | ||
|
||
int desiredBatches = determineDesiredBatches(); | ||
if (finishedTasks.size() >= desiredBatches) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ConcurrentLinkedQueue#size() is expensive
, we should probably have a separate counter. https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html#size--
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maximum number of Task in this case can be 10 since we decided to set the internal default to 10.
Adding separate counter will require to add synchronous blocks to that we atomically add and increment .
Thus considering the o(n) which will be max 10 I donot think we would require separate counters
return; | ||
} | ||
|
||
queueAttributesManager.getVisibilityTimeout().thenAcceptAsync(visibilityTimeout -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason we want to offload this to another executor? Offloading it to a thread may add more latency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason , modified it
queueAttributesManager.getVisibilityTimeout().thenAcceptAsync(visibilityTimeout -> { | ||
int max = Math.max(config.maxInflightReceiveBatches(), 1); | ||
int toSpawn = max - inflightReceiveMessageBatches.get(); | ||
if (toSpawn > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we spawn toSpawn
worth of messages instead of just 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope , we should spawn exactly 1 , we donot want to spawn N number of messages but we want to minimize the number of spawned messages, that decision is taken based on inflight and unprocessed batches
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my own understanding, could you elaborate on why we don't want to spawn N number of messages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main purpose of the Automatic Batch Manager is to reduce the number of Service API calls.
We initiate an additional call at the end of an existing call or when a response is received to pre-fill our internal buffers. This is necessary because SQS doesn’t always have messages immediately available; it takes some time for the servers to populate the messages.
In most cases, users perform a Receive call with the default message size of 1. However, in the background, we fetch the maximum number of messages (10) in advance. This means we don't need N individual calls, but rather just one additional call to pre-fetch those messages. If we were to make N calls, we could end up with a maximum of N*10 messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha, thanks for the clarification
} | ||
}); | ||
} finally { | ||
processingFutures.set(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably double check again there's no task that needs to be published before releasing the flag.
private void satisfyFuturesFromBuffer() {
do {
if (!processingFutures.compareAndSet(false, true)) {
return;
}
try {
FutureRequestWrapper future = futures.poll();
if (!future.future.isDone()) {
fulfillFuture(future);
}
} finally {
processingFutures.set(false);
}
} while (shouldProcessMore());
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
return desiredBatches; | ||
} | ||
|
||
private void fulfillFuture(FutureRequestWrapper futureWrapper, ReceiveSqsMessageHelper messageHelper) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggesting calling finishedTasks.peek() directly instead of passing messageHelper
in this method since it already has logic to use it.
private void fulfillFuture(FutureRequestWrapper futureWrapper)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -13,13 +13,12 @@ | |||
* permissions and limitations under the License. | |||
*/ | |||
|
|||
package software.amazon.awssdk.services.sqs.BatchManager; | |||
package software.amazon.awssdk.services.sqs.batchmanager; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need this class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To test abstract class RequestBatchManager we need this class
// Logging an error is sufficient here as this is an asynchronous cleanup activity, | ||
// and there are no dependent tasks waiting for its completion. | ||
if (t != null) { | ||
log.error(() -> "Could not change visibility for queue " + queueUrl, t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be warn instead? Do customers need to retry this by themselves? Can we add more instructions here? Example: https://github.com/aws/aws-sdk-java-v2/blob/master/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/GenericMultipartHelper.java#L121
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeap, made it to warn.
No customer need not retry , the SQS will make those messages visible after visibilty timeout is over
|
||
public void clear() { | ||
if (!isEmpty()) { | ||
Optional.ofNullable(nackMessages()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we just use the good ol' null check instead of wrapping it with Optional.ofNullable (extra object)?
public void clear() { | ||
if (!isEmpty()) { | ||
Optional.ofNullable(nackMessages()) | ||
.ifPresent(future -> future.whenComplete((r, t) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if we are only handling exception case, we can just use exceptionally
.exceptionally(throwable -> {
}
return asyncClient.changeMessageVisibilityBatch(batchRequest); | ||
} | ||
|
||
public Integer messagesSize() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this method? messages.size() is expensive
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This message is very helpful for testing, this is internal API and used only for testing , I added @SdkTestInternalApi just to convey its only for testing
return exception; | ||
} | ||
|
||
public void setException(Throwable exception) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private? or maybe just remove this method and call this.exception = exception
directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
18c06cc
into
feature/master/sqs-batch-manager
@zoewangg merge for now since I had to raise Surface API related PR , please approve this or let me know if more comments |
} | ||
|
||
public ReceiveQueueBuffer build() { | ||
return new ReceiveQueueBuffer(executor, sqsClient, config, queueUrl, queueAttributesManager); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do return new ReceiveQueueBuffer(this);
?
queueAttributesManager.getVisibilityTimeout().thenAcceptAsync(visibilityTimeout -> { | ||
int max = Math.max(config.maxInflightReceiveBatches(), 1); | ||
int toSpawn = max - inflightReceiveMessageBatches.get(); | ||
if (toSpawn > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my own understanding, could you elaborate on why we don't want to spawn N number of messages?
@SdkInternalApi | ||
public final class SqsMessageDefault { | ||
|
||
public static final int MAX_SUPPORTED_SQS_RECEIVE_MSG = 10; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's also ResponseBatchConfiguration, should we add this constant to that class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Raised #5550 for above comments |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with the latest change
* Codegenerate BatchManager API under AsyncClient and Initail Interfaces for BatchManager (#5321) * Codegenerate BatchManager API under AsyncClient and Add empty initial Batchmanager interfaces and Implementations * Addressed review comments * Added Internal classes required for BatchManager Implementation * Revert "Added Internal classes required for BatchManager Implementation" This reverts commit 318969b. * Internal classes and RequestBatchManager Impelementation (#5418) * Added Internal classes required for BatchManager Implementation * Added Batch Send Implementation * Handled review comments * Handled review comments * Handled review comments * Made RequestBatchManager class a Abstract class * Checkstyle issues * Removed unused methods * New lines removed * Made public static to private state for sqsBatch functions * Constants added * Sonar cloud issues fixed * commit to check why test on codebuild * Increased Timeouts for get * Added abstract methods * Handled comments to remove Builders * Handled comments to take care when batchmanager closed while pending requests * Handled comments * Checkstyle issue * Added Consumer builders args for existing APIs of BatchManager (#5514) * Receive Batch Manager Implementation (#5488) * Add Recieve Buffer Queue And its related configuration * Update ReceiveBatch manager * Recieve Batch Manager Implementation * Receive Batch Manager Implemetation * Handled review comments * Checkstyle failure * Flsky test case fixed * Flaky test case fixed * Hamdled review comments * Handled comments * Removed ReceiveMessageCompletableFuture * SdkClosable implemented * Added ReceiveMessageBatchManager class for completeness * Checkstyle issues * Null checks * Handled comments from Zoe * Updated the defaults to 50ms same as V1 after surface area review * Revert "Updated the defaults to 50ms same as V1 after surface area review" This reverts commit e7d2295. * Bytes Based batching for SendMessageRequest Batching (#5540) * Initial changes * Initial changes 2 * Byte Based batching for SendMessage API * Byte Based batching for SendMessage API * Handled comments * Checkstyle issue * Add User Agent for Sqs Calls made using Automatic Batching Manager (#5546) * Add User Agent for Sqs Calls made using Automatic Batching Manager as hll/abm * Review comments * Update comments from PR 5488 (#5550) * Update comments of PR 5488 * Update comments from PR 5488 * Handled surface area review comments (#5563) * Initial version * Intermediate changes * Update after internal poll * ResponseCOnfiguration construction updated * RequestOverride configuration check added to Bypass batch manager * Handled review comments * Removed TODO since validations are handled in BatchPverrideConfiguration * Fix issue where the Scheduled Timeout was incorrectly completing the futures with empty messages (#5571) * Fix issue where the Scheduled Timeout was incorrectly completing the futures with empty messages * Handled review comments * Integ test for Automatic Request Batching (#5576) * feat(sqs): add BatchManager for client-side request batching to Amazon SQS The new BatchManager allows for simple request batching using client-side buffering, improving cost efficiency and reducing the number of requests sent to Amazon SQS. The client-side buffering supports up to 10 requests per batch and is supported by the SqsAsyncClient. Batched requests, along with receive message polling, help to increase throughput. * Add check for scheduledExecutor such that it not null when creating SqsAsyncBatchManager (#5582) * Add check for scheduledExecutor such that it not null when creating SqsAsyncBatchManager * Update services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerBuilderTest.java Co-authored-by: David Ho <[email protected]> * Update services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerBuilderTest.java Co-authored-by: David Ho <[email protected]> * Update services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerBuilderTest.java Co-authored-by: David Ho <[email protected]> --------- Co-authored-by: David Ho <[email protected]> * Updating Timeouts in gets so that we dont wait infinitely --------- Co-authored-by: David Ho <[email protected]>
Motivation and Context
This change is required to efficiently manage the receiving of message batches from AWS SQS, ensuring proper handling of message visibility timeout and error scenarios. It solves the problem of handling asynchronous message retrieval and processing, managing visibility timeouts, and handling exceptions during the process.
Modifications
AsyncReceiveMessageBatch
Class:ReceiveMessageRequest
based on configuration settings.ReceiveMessageCompletableFuture
Class:ReceiveQueueBuffer
Class:ResponseBatchConfiguration
Class:QueueAttributesManager
Class:Design Details
Asynchronous Handling:
AsyncReceiveMessageBatch
uses asynchronous handling for receiving messages from SQS. It leveragesCompletableFuture
to manage asynchronous operations and handle results or exceptions.Visibility Timeout Management:
visibilityDeadlineNano
.Error Handling:
AsyncReceiveMessageBatch
andQueueAttributesManager
classes handle exceptions by setting the future to complete exceptionally and resetting state where necessary.Concurrent Task Management:
ReceiveQueueBuffer
uses a concurrent queue to manage finished tasks and inflight receive message batches, ensuring thread-safe operations.QueueAttributesManager
uses anAtomicReference
to manage the cached future for fetching queue attributes, ensuring that only one request is made even under concurrent access.Adaptive Prefetching:
ResponseBatchConfiguration
supports adaptive prefetching, adjusting the number of batches based on the load and configuration.Testing
Unit tests were added for
AsyncReceiveMessageBatch
to cover:Tests for
ReceiveQueueBuffer
include:QueueAttributesManager
tests:License