Skip to content

Added Consumer builders args for existing APIs of BatchManager #5514

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.internal.batchmanager.DefaultSqsAsyncBatchManager;
Expand All @@ -35,7 +36,6 @@
* <p>
* This manager buffers client requests and sends them in batches to the service, enhancing efficiency by reducing the number of
* API requests. Requests are buffered until they reach a specified limit or a timeout occurs.
* TODO : add consumer builder overloads for requests for all the methods.
*/
@SdkPublicApi
public interface SqsAsyncBatchManager extends SdkAutoCloseable {
Expand All @@ -61,6 +61,19 @@ default CompletableFuture<SendMessageResponse> sendMessage(SendMessageRequest re
throw new UnsupportedOperationException();
}


/**
* Buffers and batches {@link SendMessageRequest}s using a {@link Consumer} to configure the request,
* sending them as a {@link software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest}.
* Requests are grouped by queue URL and override configuration, and sent when the batch size or timeout is reached.
*
* @param sendMessageRequest A {@link Consumer} to configure the SendMessageRequest to be buffered.
* @return CompletableFuture of the corresponding {@link SendMessageResponse}.
*/
default CompletableFuture<SendMessageResponse> sendMessage(Consumer<SendMessageRequest.Builder> sendMessageRequest) {
return sendMessage(SendMessageRequest.builder().applyMutation(sendMessageRequest).build());
}

/**
* Buffers and batches {@link DeleteMessageRequest}s, sending them as a
* {@link software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest}. Requests are grouped by queue URL and override
Expand All @@ -73,6 +86,19 @@ default CompletableFuture<DeleteMessageResponse> deleteMessage(DeleteMessageRequ
throw new UnsupportedOperationException();
}


/**
* Buffers and batches {@link DeleteMessageRequest}s using a {@link Consumer} to configure the request,
* sending them as a {@link software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest}.
* Requests are grouped by queue URL and override configuration, and sent when the batch size or timeout is reached.
*
* @param request A {@link Consumer} to configure the DeleteMessageRequest to be buffered.
* @return CompletableFuture of the corresponding {@link DeleteMessageResponse}.
*/
default CompletableFuture<DeleteMessageResponse> deleteMessage(Consumer<DeleteMessageRequest.Builder> request) {
return deleteMessage(DeleteMessageRequest.builder().applyMutation(request).build());
}

/**
* Buffers and batches {@link ChangeMessageVisibilityRequest}s, sending them as a
* {@link software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest}. Requests are grouped by queue URL
Expand All @@ -85,6 +111,20 @@ default CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibili
throw new UnsupportedOperationException();
}

/**
* Buffers and batches {@link ChangeMessageVisibilityRequest}s using a {@link Consumer} to configure the request,
* sending them as a {@link software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest}.
* Requests are grouped by queue URL and override configuration, and sent when the batch size or timeout is reached.
*
* @param request A {@link Consumer} to configure the ChangeMessageVisibilityRequest to be buffered.
* @return CompletableFuture of the corresponding {@link ChangeMessageVisibilityResponse}.
*/
default CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibility(
Consumer<ChangeMessageVisibilityRequest.Builder> request) {
return changeMessageVisibility(ChangeMessageVisibilityRequest.builder().applyMutation(request).build());
}


/**
* Buffers and retrieves messages with {@link ReceiveMessageRequest}, with a maximum of 10 messages per request. Returns an
* empty message if no messages are available in SQS.
Expand All @@ -96,6 +136,19 @@ default CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageR
throw new UnsupportedOperationException();
}

/**
* Buffers and retrieves messages with {@link ReceiveMessageRequest} using a {@link Consumer} to configure the request,
* with a maximum of 10 messages per request. Returns an empty message if no messages are available in SQS.
*
* @param request A {@link Consumer} to configure the ReceiveMessageRequest.
* @return CompletableFuture of the corresponding {@link ReceiveMessageResponse}.
*/
default CompletableFuture<ReceiveMessageResponse> receiveMessage(
Consumer<ReceiveMessageRequest.Builder> request) {
return receiveMessage(ReceiveMessageRequest.builder().applyMutation(request).build());
}


interface Builder {

/**
Expand All @@ -106,6 +159,16 @@ interface Builder {
*/
Builder overrideConfiguration(BatchOverrideConfiguration overrideConfiguration);

/**
* Sets custom overrides for the BatchManager configuration using a {@link Consumer} to configure the overrides.
*
* @param overrideConfiguration A {@link Consumer} to configure the {@link BatchOverrideConfiguration}.
* @return This builder for method chaining.
*/
default Builder overrideConfiguration(Consumer<BatchOverrideConfiguration.Builder> overrideConfiguration) {
return overrideConfiguration(BatchOverrideConfiguration.builder().applyMutation(overrideConfiguration).build());
}

/**
* Sets a custom {@link software.amazon.awssdk.services.sqs.SqsClient} for polling resources. This client must be closed
* by the caller.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import software.amazon.awssdk.core.exception.SdkClientException;
Expand All @@ -47,7 +48,7 @@

public abstract class BaseSqsBatchManagerTest {

private static final String DEFAULT_QUEUE_URL = "SomeQueueUrl";
protected static final String DEFAULT_QUEUE_URL = "SomeQueueUrl";
private static final int DEFAULT_MAX_BATCH_OPEN = 200;


Expand Down Expand Up @@ -293,25 +294,6 @@ public abstract List<CompletableFuture<SendMessageResponse>> createAndSendSendMe

public abstract List<CompletableFuture<ChangeMessageVisibilityResponse>> createAndSendChangeVisibilityRequests();

SendMessageRequest createSendMessageRequest(String messageBody) {
return SendMessageRequest.builder()
.messageBody(messageBody)
.queueUrl(DEFAULT_QUEUE_URL)
.build();
}

DeleteMessageRequest createDeleteMessageRequest() {
return DeleteMessageRequest.builder()
.queueUrl(DEFAULT_QUEUE_URL)
.build();
}

ChangeMessageVisibilityRequest createChangeVisibilityRequest() {
return ChangeMessageVisibilityRequest.builder()
.queueUrl(DEFAULT_QUEUE_URL)
.build();
}

private String getMd5Hash(String message) {
byte[] expectedMd5;
expectedMd5 = Md5Utils.computeMD5Hash(message.getBytes(StandardCharsets.UTF_8));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,41 +78,26 @@ public void tearDown() {

@Override
public List<CompletableFuture<SendMessageResponse>> createAndSendSendMessageRequests(String message1, String message2) {
List<SendMessageRequest> requests = new ArrayList<>();
requests.add(createSendMessageRequest(message1));
requests.add(createSendMessageRequest(message2));

List<CompletableFuture<SendMessageResponse>> responses = new ArrayList<>();
for (SendMessageRequest request : requests) {
responses.add(batchManager.sendMessage(request));
}
responses.add(batchManager.sendMessage(builder -> builder.queueUrl(DEFAULT_QUEUE_URL).messageBody(message1)));
responses.add(batchManager.sendMessage(builder -> builder.queueUrl(DEFAULT_QUEUE_URL).messageBody(message2)));
return responses;
}

@Override
public List<CompletableFuture<DeleteMessageResponse>> createAndSendDeleteMessageRequests() {
List<DeleteMessageRequest> requests = new ArrayList<>();
requests.add(createDeleteMessageRequest());
requests.add(createDeleteMessageRequest());
List<CompletableFuture<DeleteMessageResponse>> responses = new ArrayList<>();

for (DeleteMessageRequest request : requests) {
responses.add(batchManager.deleteMessage(request));
}
responses.add(batchManager.deleteMessage(builder -> builder.queueUrl(DEFAULT_QUEUE_URL)));
responses.add(batchManager.deleteMessage(builder -> builder.queueUrl(DEFAULT_QUEUE_URL)));
return responses;
}

@Override
public List<CompletableFuture<ChangeMessageVisibilityResponse>> createAndSendChangeVisibilityRequests() {
List<ChangeMessageVisibilityRequest> requests = new ArrayList<>();
requests.add(createChangeVisibilityRequest());
requests.add(createChangeVisibilityRequest());

List<CompletableFuture<ChangeMessageVisibilityResponse>> responses = new ArrayList<>();
for (ChangeMessageVisibilityRequest request : requests) {
responses.add(batchManager.changeMessageVisibility(request));
}

responses.add(batchManager.changeMessageVisibility(builder -> builder.queueUrl(DEFAULT_QUEUE_URL)));
responses.add(batchManager.changeMessageVisibility(builder -> builder.queueUrl(DEFAULT_QUEUE_URL)));
return responses;
}
}
Loading