Skip to content

Receive Batch Manager Implementation #5488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged

Conversation

joviegas
Copy link
Contributor

@joviegas joviegas commented Aug 8, 2024

Motivation and Context

This change is required to efficiently manage the receiving of message batches from AWS SQS, ensuring proper handling of message visibility timeout and error scenarios. It solves the problem of handling asynchronous message retrieval and processing, managing visibility timeouts, and handling exceptions during the process.

Modifications

  • AsyncReceiveMessageBatch Class:

    • Forms a ReceiveMessageRequest based on configuration settings.
    • Collects messages from an AWS SQS queue.
    • Handles exceptions during the process.
    • Manages message visibility timeout by tracking the visibility deadline and expiring messages if not processed in time.
    • Supports clearing messages in the batch and changing their visibility as needed.
  • ReceiveMessageCompletableFuture Class:

    • Manages the completion of the receive message future with success or failure.
    • Starts a wait timer for the receive message request and handles timeout scenarios.
  • ReceiveQueueBuffer Class:

    • Manages the queue buffer for received messages.
    • Handles the reception of messages and the spawning of additional receive tasks.
    • Manages the clearing of finished tasks and handling of futures for received messages.
    • Supports shutdown functionality, ensuring all futures are completed exceptionally upon shutdown.
  • ResponseBatchConfiguration Class:

    • Configures various batch settings such as visibility timeout, long poll settings, and adaptive prefetching.
  • QueueAttributesManager Class:

    • Manages SQS queue attributes.
    • Fetches and caches attributes like visibility timeout and receive message wait time.
    • Ensures only a single call is made to fetch the attributes, with proper handling of concurrent requests and failures.

Design Details

  • Asynchronous Handling:

    • The AsyncReceiveMessageBatch uses asynchronous handling for receiving messages from SQS. It leverages CompletableFuture to manage asynchronous operations and handle results or exceptions.
  • Visibility Timeout Management:

    • The class tracks the visibility deadline for messages to ensure they are reprocessed if not handled within the specified time. This is managed using visibilityDeadlineNano.
  • Error Handling:

    • The AsyncReceiveMessageBatch and QueueAttributesManager classes handle exceptions by setting the future to complete exceptionally and resetting state where necessary.
  • Concurrent Task Management:

    • ReceiveQueueBuffer uses a concurrent queue to manage finished tasks and inflight receive message batches, ensuring thread-safe operations.
    • QueueAttributesManager uses an AtomicReference to manage the cached future for fetching queue attributes, ensuring that only one request is made even under concurrent access.
  • Adaptive Prefetching:

    • The ResponseBatchConfiguration supports adaptive prefetching, adjusting the number of batches based on the load and configuration.

Testing

  • Unit tests were added for AsyncReceiveMessageBatch to cover:

    • Message receiving and handling.
    • Visibility timeout management.
    • Error handling during message receiving.
  • Tests for ReceiveQueueBuffer include:

    • Handling multiple receive message requests.
    • Managing finished tasks and inflight batches.
    • Testing shutdown behavior.
  • QueueAttributesManager tests:

    • Fetching and caching queue attributes.
    • Handling exceptions during attribute fetching.
    • Ensuring concurrent requests are handled correctly and only a single call is made to fetch attributes.

License

  • I confirm that this pull request can be released under the Apache 2 license

@joviegas joviegas requested a review from a team as a code owner August 8, 2024 19:53
Comment on lines 91 to 98
messages = new ArrayList<>(response.messages());
}
open.set(true);
return this;
});
} finally {
visibilityDeadlineNano = System.nanoTime() + visibilityTimeout.toNanos();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this gets called multiple times? Seems like these can get out of sync; for example one request might succeed so messages has content, but another one fails so exception is also non-null.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is called per instance of AsyncReceiveMessageBatch. Each thread creates its own instance, so there’s no case where it can go out of sync. Since this internal API is called per instance, I can’t think of a scenario where a single instance is created and accessed by multiple threads. If you see any such case, please let me know

this.config = config;
}

public CompletableFuture<AsyncReceiveMessageBatch> asyncReceiveMessage() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support calling this multiple times?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every spawn calls create as new instances of AsyncReceiveMessageBatch

                AsyncReceiveMessageBatch asyncReceiveMessageBatch = new AsyncReceiveMessageBatch(
                    queueUrl, sqsClient, visibilityTimeoutNanos, config);
                inflightReceiveMessageBatches.incrementAndGet();
                asyncReceiveMessageBatch.asyncReceiveMessage()
                                        .whenComplete((response, exception) -> reportBatchFinished(response));

So two simultaneous calls will create two instances and do two separate reportBatchFinished which is expected.
However AsyncReceiveMessageBatch is handled for Thread safety

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By making

private volatile Queue<Message> messages = new ConcurrentLinkedQueue<>(); // Thread-safe queue

We are now able to run this mutiple times in a thread safe way


public AsyncReceiveMessageBatch(ScheduledExecutorService scheduledExecutorService, String queueUrl,
SqsAsyncClient asyncClient, Duration visibilityTimeout, ResponseBatchConfiguration config) {
this.scheduledExecutorService = scheduledExecutorService;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.sqs.BatchManager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

capitalized package name

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private final ResponseBatchConfiguration config;
private final AtomicBoolean open = new AtomicBoolean(false);
private volatile Throwable exception;
private List<Message> messages;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the very least, this probably needs to be volatile too

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, made it volatile and used CopyOnWriteArrayList

return asyncClient.changeMessageVisibilityBatch(batchRequest);
}

private void checkIfOpen() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really sure what the purpose of ensure this is "open" should be. What does "open" indicate?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed this , this was redundant check to make sure we donot access the exception or messages while the receieveMessage call has not happened or in progress.

@dagnir
Copy link
Contributor

dagnir commented Aug 8, 2024

Still looking, giving some early feedback

});

resultFuture.whenComplete((r, t) -> {
if (resultFuture.isCancelled()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: probably better to just check instanceof on t. If this future was cancelled, t will have type CancellationException.

@joviegas joviegas force-pushed the joviegas/receive_buffer_queue branch from 5a887a7 to 6e47217 Compare August 14, 2024 22:56
Copy link
Contributor

@zoewangg zoewangg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still going through the PR, left some initial comments

Copy link
Contributor

@zoewangg zoewangg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some comments, still not finished review

}

/**
* Define the message attributes receive calls will request. Only receive message requests that request the same set of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand this is what used in v1, but should we use something like .receiveMessageRequestTransformer to make it easier for customers to change other things in the request? And we can get rid of the other settings like maxBatchItems?

Prior art: https://github.com/aws/aws-sdk-java-v2/blob/master/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/model/DownloadDirectoryRequest.java#L329

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Zoe. If that’s the case, we’ll need to change the behavior of the feature. Specifically, when fetching SQS messages, we would fetch all the attributes and then filter them at the request level based on the attributes passed in the request. This change would have two main impacts:

1.	Diverging from V1 behavior.
2.	Requiring the retrieval of all attributes, even when the user has configured only a specific number of attribute names in the configuration.

It’s important to note that we currently only respect the maxNumberOfMessages and waitTimeSeconds parameters in the request, which are specific to the request. The other parameters are referenced from the configuration settings and are queue-level settings.

Given this, we have the following options:

•	We could remove receiveAttributeNames for now and add it later if there is a user request.
•	We could keep it the same as V1 and provide a configuration option, as it currently exists.
•	If we provide a request-level option like receiveMessageRequestTransformer, we could pull out the entire attributes map from SQS and then filter it while completing the future.

From my observation of the attributeMap, I think the current behavior—where configuration-level attributeNames are used—is intended to optimize the receiveMessage payload when it comes to the map.

Also, in my opinion, if batchManager is something I expect to operate at the batch level, then if users want specific attributes, they can use the normal receiveMessage API. This way, we can avoid complications in logic where we complete the futures and keep it as simple as V1.

Will make sure will discuss in Surface API review meeting

@@ -85,6 +86,29 @@ default CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibili
throw new UnsupportedOperationException();
}


default CompletableFuture<SendMessageResponse> sendMessage(Consumer<SendMessageRequest.Builder> sendMessageRequest) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing javadocs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad I raised a separate PR for this and accidently pushed some changes

Comment on lines 119 to 122
queueAttributeMap.set(null); // Reset on failure
newFuture.completeExceptionally(t); // Complete the future exceptionally
} else {
newFuture.complete(r); // Complete the future with the result
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove the comments? The code seems to be self explanatory to me and having comments here clutter the code a bit IMO.

Comment on lines 161 to 165
future.whenComplete((r, t) -> {
if (t != null) {
queueAttributeMap.set(null); // Reset the future on failure
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this here? It seems we nullify it on line 119

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch , its not required.

});
}

public void shutdown() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we follow v2 style - make it extend SdkAutoCloseable and implement close?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

ReceiveMessageCompletableFuture receiveMessageFuture =
new ReceiveMessageCompletableFuture(numMessages, waitTimeMs);
receiveQueueBuffer.receiveMessage(receiveMessageFuture);
receiveMessageFuture.startWaitTimer(executor);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of writing timer logic, can we use CompletableFuture#applyToEither or CompletebleFuture#anyOf instead where one is waiting for the response, the other is completing the future with empty response after the timeout.

We may be able to get rid of ReceiveMessageCompletableFuture class if the purpose of it is to encapsulate timer logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@joviegas joviegas force-pushed the joviegas/receive_buffer_queue branch from 93d2b7e to 1a0b987 Compare August 18, 2024 01:00
}

private void satisfyFuturesFromBuffer() {
pruneExpiredTasks();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems we can check expired tasks where we are processing future, i.e., line 170?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

/**
* @return the optional maximum amount of time that an outgoing call waits to be batched with messages of the same type.
*/
public Duration maxBatchOpenDuration() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to change this as part of this PR; I'm just writing down alternatives batchOpenTimeout

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will change this in next PR where I will handle Batch Size related comments from Surface area review

* @return the message attributes receive calls will request.
*/
public List<String> receiveMessageAttributeNames() {
return receiveMessageAttributeNames == null ? Collections.emptyList() :
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.unmodifiableList(Collections.emptyList())? Can we do the wrap in the ctor so that we don't need to wrap every time getter is invoked?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Handled this is builders() now

if (receiveQueueBuffer.isShutDown()) {
throw new IllegalStateException("The client has been shut down.");
}
int numMessages = Optional.ofNullable(rq.maxNumberOfMessages()).orElse(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we define all magic numbers in a single class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added SQSMessageDefaults

}

public CompletableFuture<ReceiveMessageResponse> batchRequest(ReceiveMessageRequest request) {
return canBeRetrievedFromQueueBuffer(request)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add some debug logs to help troubleshoot in the future, i.e., when it decides to retrieve from the buffer or send it through the SDK client? Example: https://github.com/aws/aws-sdk-java-v2/blob/master/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithKnownContentLengthHelper.java#L75

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think its safe to add only in canBeRetrievedFromQueueBuffer false case , else we will get lot of logs , added them now

}

int desiredBatches = determineDesiredBatches();
if (finishedTasks.size() >= desiredBatches) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentLinkedQueue#size() is expensive, we should probably have a separate counter. https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html#size--

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maximum number of Task in this case can be 10 since we decided to set the internal default to 10.
Adding separate counter will require to add synchronous blocks to that we atomically add and increment .
Thus considering the o(n) which will be max 10 I donot think we would require separate counters

return;
}

queueAttributesManager.getVisibilityTimeout().thenAcceptAsync(visibilityTimeout -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason we want to offload this to another executor? Offloading it to a thread may add more latency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason , modified it

queueAttributesManager.getVisibilityTimeout().thenAcceptAsync(visibilityTimeout -> {
int max = Math.max(config.maxInflightReceiveBatches(), 1);
int toSpawn = max - inflightReceiveMessageBatches.get();
if (toSpawn > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we spawn toSpawn worth of messages instead of just 1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope , we should spawn exactly 1 , we donot want to spawn N number of messages but we want to minimize the number of spawned messages, that decision is taken based on inflight and unprocessed batches

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own understanding, could you elaborate on why we don't want to spawn N number of messages?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main purpose of the Automatic Batch Manager is to reduce the number of Service API calls.

We initiate an additional call at the end of an existing call or when a response is received to pre-fill our internal buffers. This is necessary because SQS doesn’t always have messages immediately available; it takes some time for the servers to populate the messages.

In most cases, users perform a Receive call with the default message size of 1. However, in the background, we fetch the maximum number of messages (10) in advance. This means we don't need N individual calls, but rather just one additional call to pre-fetch those messages. If we were to make N calls, we could end up with a maximum of N*10 messages.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, thanks for the clarification

}
});
} finally {
processingFutures.set(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably double check again there's no task that needs to be published before releasing the flag.

    private void satisfyFuturesFromBuffer() {
        do {
            if (!processingFutures.compareAndSet(false, true)) {
                return;
            }
            try {
                FutureRequestWrapper future = futures.poll();
                if (!future.future.isDone()) {
                    fulfillFuture(future);
                }
            } finally {
                processingFutures.set(false);
            }
        } while (shouldProcessMore());
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return desiredBatches;
}

private void fulfillFuture(FutureRequestWrapper futureWrapper, ReceiveSqsMessageHelper messageHelper) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggesting calling finishedTasks.peek() directly instead of passing messageHelper in this method since it already has logic to use it.

private void fulfillFuture(FutureRequestWrapper futureWrapper) 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -13,13 +13,12 @@
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.sqs.BatchManager;
package software.amazon.awssdk.services.sqs.batchmanager;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To test abstract class RequestBatchManager we need this class

// Logging an error is sufficient here as this is an asynchronous cleanup activity,
// and there are no dependent tasks waiting for its completion.
if (t != null) {
log.error(() -> "Could not change visibility for queue " + queueUrl, t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be warn instead? Do customers need to retry this by themselves? Can we add more instructions here? Example: https://github.com/aws/aws-sdk-java-v2/blob/master/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/GenericMultipartHelper.java#L121

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeap, made it to warn.
No customer need not retry , the SQS will make those messages visible after visibilty timeout is over


public void clear() {
if (!isEmpty()) {
Optional.ofNullable(nackMessages())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we just use the good ol' null check instead of wrapping it with Optional.ofNullable (extra object)?

public void clear() {
if (!isEmpty()) {
Optional.ofNullable(nackMessages())
.ifPresent(future -> future.whenComplete((r, t) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if we are only handling exception case, we can just use exceptionally

.exceptionally(throwable -> {
}

return asyncClient.changeMessageVisibilityBatch(batchRequest);
}

public Integer messagesSize() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this method? messages.size() is expensive

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This message is very helpful for testing, this is internal API and used only for testing , I added @SdkTestInternalApi just to convey its only for testing

return exception;
}

public void setException(Throwable exception) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private? or maybe just remove this method and call this.exception = exception directly

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link

Quality Gate Failed Quality Gate failed

Failed conditions
C Reliability Rating on New Code (required ≥ A)

See analysis details on SonarCloud

Catch issues before they fail your Quality Gate with our IDE extension SonarLint

@joviegas joviegas merged commit 18c06cc into feature/master/sqs-batch-manager Aug 27, 2024
16 of 17 checks passed
@joviegas
Copy link
Contributor Author

@zoewangg merge for now since I had to raise Surface API related PR , please approve this or let me know if more comments

}

public ReceiveQueueBuffer build() {
return new ReceiveQueueBuffer(executor, sqsClient, config, queueUrl, queueAttributesManager);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do return new ReceiveQueueBuffer(this);?

queueAttributesManager.getVisibilityTimeout().thenAcceptAsync(visibilityTimeout -> {
int max = Math.max(config.maxInflightReceiveBatches(), 1);
int toSpawn = max - inflightReceiveMessageBatches.get();
if (toSpawn > 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own understanding, could you elaborate on why we don't want to spawn N number of messages?

@SdkInternalApi
public final class SqsMessageDefault {

public static final int MAX_SUPPORTED_SQS_RECEIVE_MSG = 10;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also ResponseBatchConfiguration, should we add this constant to that class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joviegas
Copy link
Contributor Author

joviegas commented Sep 3, 2024

Raised #5550 for above comments

Copy link
Contributor

@zoewangg zoewangg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with the latest change

github-merge-queue bot pushed a commit that referenced this pull request Sep 11, 2024
* Codegenerate BatchManager API under AsyncClient and Initail Interfaces for BatchManager (#5321)

* Codegenerate BatchManager API under AsyncClient and Add empty initial Batchmanager interfaces and Implementations

* Addressed review comments

* Added Internal classes required for BatchManager Implementation

* Revert "Added Internal classes required for BatchManager Implementation"

This reverts commit 318969b.

* Internal classes and RequestBatchManager Impelementation (#5418)

* Added Internal classes required for BatchManager Implementation

* Added Batch Send Implementation

* Handled review comments

* Handled review comments

* Handled review comments

* Made RequestBatchManager class a Abstract class

* Checkstyle issues

* Removed unused methods

* New lines removed

* Made public static to private state for sqsBatch functions

* Constants added

* Sonar cloud issues fixed

* commit to check why test on codebuild

* Increased Timeouts for get

* Added abstract methods

* Handled comments to remove Builders

* Handled comments to take care when batchmanager closed while pending requests

* Handled comments

* Checkstyle issue

* Added Consumer builders args for existing APIs of BatchManager (#5514)

* Receive Batch Manager Implementation (#5488)

* Add Recieve Buffer Queue And its related configuration

* Update ReceiveBatch  manager

* Recieve Batch Manager Implementation

* Receive Batch Manager Implemetation

* Handled review comments

* Checkstyle failure

* Flsky test case fixed

* Flaky test case fixed

* Hamdled review comments

* Handled comments

* Removed ReceiveMessageCompletableFuture

* SdkClosable implemented

* Added ReceiveMessageBatchManager class for completeness

* Checkstyle issues

* Null checks

* Handled comments from Zoe

* Updated the defaults to 50ms same as V1 after surface area review

* Revert "Updated the defaults to 50ms same as V1 after surface area review"

This reverts commit e7d2295.

* Bytes Based batching for SendMessageRequest Batching (#5540)

* Initial changes

* Initial changes 2

* Byte Based batching for SendMessage API

* Byte Based batching for SendMessage API

* Handled comments

* Checkstyle issue

* Add User Agent for Sqs Calls made using Automatic Batching Manager  (#5546)

* Add User Agent for Sqs Calls made using Automatic Batching Manager as hll/abm

* Review comments

* Update comments from  PR 5488 (#5550)

* Update comments of PR 5488

* Update comments from  PR 5488

* Handled surface area review comments (#5563)

* Initial version

* Intermediate changes

* Update after internal poll

* ResponseCOnfiguration construction updated

* RequestOverride configuration check added to Bypass batch manager

* Handled review comments

* Removed TODO since validations are handled in BatchPverrideConfiguration

* Fix issue where the Scheduled Timeout was incorrectly completing the futures with empty messages (#5571)

* Fix issue where the Scheduled Timeout was incorrectly completing the futures with empty messages

* Handled review comments

* Integ test for Automatic Request Batching (#5576)

* feat(sqs): add BatchManager for client-side request batching to Amazon SQS

The new BatchManager allows for simple request batching using client-side buffering, improving cost efficiency and reducing the number of requests sent to Amazon SQS. The client-side buffering supports up to 10 requests per batch and is supported by the SqsAsyncClient. Batched requests, along with receive message polling, help to increase throughput.

* Add check for scheduledExecutor such that it not null when creating SqsAsyncBatchManager (#5582)

* Add check for scheduledExecutor such that it not null when creating SqsAsyncBatchManager

* Update services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerBuilderTest.java

Co-authored-by: David Ho <[email protected]>

* Update services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerBuilderTest.java

Co-authored-by: David Ho <[email protected]>

* Update services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerBuilderTest.java

Co-authored-by: David Ho <[email protected]>

---------

Co-authored-by: David Ho <[email protected]>

* Updating Timeouts in gets so that we dont wait infinitely

---------

Co-authored-by: David Ho <[email protected]>
@joviegas joviegas deleted the joviegas/receive_buffer_queue branch January 15, 2025 17:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants