Skip to content

Commit 21642b1

Browse files
authored
Added Consumer builders args for existing APIs of BatchManager (#5514)
1 parent a04379b commit 21642b1

File tree

3 files changed

+72
-42
lines changed

3 files changed

+72
-42
lines changed

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManager.java

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.concurrent.CompletableFuture;
1919
import java.util.concurrent.ScheduledExecutorService;
20+
import java.util.function.Consumer;
2021
import software.amazon.awssdk.annotations.SdkPublicApi;
2122
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2223
import software.amazon.awssdk.services.sqs.internal.batchmanager.DefaultSqsAsyncBatchManager;
@@ -35,7 +36,6 @@
3536
* <p>
3637
* This manager buffers client requests and sends them in batches to the service, enhancing efficiency by reducing the number of
3738
* API requests. Requests are buffered until they reach a specified limit or a timeout occurs.
38-
* TODO : add consumer builder overloads for requests for all the methods.
3939
*/
4040
@SdkPublicApi
4141
public interface SqsAsyncBatchManager extends SdkAutoCloseable {
@@ -61,6 +61,19 @@ default CompletableFuture<SendMessageResponse> sendMessage(SendMessageRequest re
6161
throw new UnsupportedOperationException();
6262
}
6363

64+
65+
/**
66+
* Buffers and batches {@link SendMessageRequest}s using a {@link Consumer} to configure the request,
67+
* sending them as a {@link software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest}.
68+
* Requests are grouped by queue URL and override configuration, and sent when the batch size or timeout is reached.
69+
*
70+
* @param sendMessageRequest A {@link Consumer} to configure the SendMessageRequest to be buffered.
71+
* @return CompletableFuture of the corresponding {@link SendMessageResponse}.
72+
*/
73+
default CompletableFuture<SendMessageResponse> sendMessage(Consumer<SendMessageRequest.Builder> sendMessageRequest) {
74+
return sendMessage(SendMessageRequest.builder().applyMutation(sendMessageRequest).build());
75+
}
76+
6477
/**
6578
* Buffers and batches {@link DeleteMessageRequest}s, sending them as a
6679
* {@link software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest}. Requests are grouped by queue URL and override
@@ -73,6 +86,19 @@ default CompletableFuture<DeleteMessageResponse> deleteMessage(DeleteMessageRequ
7386
throw new UnsupportedOperationException();
7487
}
7588

89+
90+
/**
91+
* Buffers and batches {@link DeleteMessageRequest}s using a {@link Consumer} to configure the request,
92+
* sending them as a {@link software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest}.
93+
* Requests are grouped by queue URL and override configuration, and sent when the batch size or timeout is reached.
94+
*
95+
* @param request A {@link Consumer} to configure the DeleteMessageRequest to be buffered.
96+
* @return CompletableFuture of the corresponding {@link DeleteMessageResponse}.
97+
*/
98+
default CompletableFuture<DeleteMessageResponse> deleteMessage(Consumer<DeleteMessageRequest.Builder> request) {
99+
return deleteMessage(DeleteMessageRequest.builder().applyMutation(request).build());
100+
}
101+
76102
/**
77103
* Buffers and batches {@link ChangeMessageVisibilityRequest}s, sending them as a
78104
* {@link software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest}. Requests are grouped by queue URL
@@ -85,6 +111,20 @@ default CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibili
85111
throw new UnsupportedOperationException();
86112
}
87113

114+
/**
115+
* Buffers and batches {@link ChangeMessageVisibilityRequest}s using a {@link Consumer} to configure the request,
116+
* sending them as a {@link software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest}.
117+
* Requests are grouped by queue URL and override configuration, and sent when the batch size or timeout is reached.
118+
*
119+
* @param request A {@link Consumer} to configure the ChangeMessageVisibilityRequest to be buffered.
120+
* @return CompletableFuture of the corresponding {@link ChangeMessageVisibilityResponse}.
121+
*/
122+
default CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibility(
123+
Consumer<ChangeMessageVisibilityRequest.Builder> request) {
124+
return changeMessageVisibility(ChangeMessageVisibilityRequest.builder().applyMutation(request).build());
125+
}
126+
127+
88128
/**
89129
* Buffers and retrieves messages with {@link ReceiveMessageRequest}, with a maximum of 10 messages per request. Returns an
90130
* empty message if no messages are available in SQS.
@@ -96,6 +136,19 @@ default CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageR
96136
throw new UnsupportedOperationException();
97137
}
98138

139+
/**
140+
* Buffers and retrieves messages with {@link ReceiveMessageRequest} using a {@link Consumer} to configure the request,
141+
* with a maximum of 10 messages per request. Returns an empty message if no messages are available in SQS.
142+
*
143+
* @param request A {@link Consumer} to configure the ReceiveMessageRequest.
144+
* @return CompletableFuture of the corresponding {@link ReceiveMessageResponse}.
145+
*/
146+
default CompletableFuture<ReceiveMessageResponse> receiveMessage(
147+
Consumer<ReceiveMessageRequest.Builder> request) {
148+
return receiveMessage(ReceiveMessageRequest.builder().applyMutation(request).build());
149+
}
150+
151+
99152
interface Builder {
100153

101154
/**
@@ -106,6 +159,16 @@ interface Builder {
106159
*/
107160
Builder overrideConfiguration(BatchOverrideConfiguration overrideConfiguration);
108161

162+
/**
163+
* Sets custom overrides for the BatchManager configuration using a {@link Consumer} to configure the overrides.
164+
*
165+
* @param overrideConfiguration A {@link Consumer} to configure the {@link BatchOverrideConfiguration}.
166+
* @return This builder for method chaining.
167+
*/
168+
default Builder overrideConfiguration(Consumer<BatchOverrideConfiguration.Builder> overrideConfiguration) {
169+
return overrideConfiguration(BatchOverrideConfiguration.builder().applyMutation(overrideConfiguration).build());
170+
}
171+
109172
/**
110173
* Sets a custom {@link software.amazon.awssdk.services.sqs.SqsClient} for polling resources. This client must be closed
111174
* by the caller.

services/sqs/src/test/java/software/amazon/awssdk/services/sqs/BatchManager/BaseSqsBatchManagerTest.java

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.ExecutionException;
3333
import java.util.concurrent.TimeUnit;
3434
import java.util.concurrent.TimeoutException;
35+
import java.util.function.Consumer;
3536
import org.junit.jupiter.api.Test;
3637
import org.junit.jupiter.api.extension.RegisterExtension;
3738
import software.amazon.awssdk.core.exception.SdkClientException;
@@ -47,7 +48,7 @@
4748

4849
public abstract class BaseSqsBatchManagerTest {
4950

50-
private static final String DEFAULT_QUEUE_URL = "SomeQueueUrl";
51+
protected static final String DEFAULT_QUEUE_URL = "SomeQueueUrl";
5152
private static final int DEFAULT_MAX_BATCH_OPEN = 200;
5253

5354

@@ -293,25 +294,6 @@ public abstract List<CompletableFuture<SendMessageResponse>> createAndSendSendMe
293294

294295
public abstract List<CompletableFuture<ChangeMessageVisibilityResponse>> createAndSendChangeVisibilityRequests();
295296

296-
SendMessageRequest createSendMessageRequest(String messageBody) {
297-
return SendMessageRequest.builder()
298-
.messageBody(messageBody)
299-
.queueUrl(DEFAULT_QUEUE_URL)
300-
.build();
301-
}
302-
303-
DeleteMessageRequest createDeleteMessageRequest() {
304-
return DeleteMessageRequest.builder()
305-
.queueUrl(DEFAULT_QUEUE_URL)
306-
.build();
307-
}
308-
309-
ChangeMessageVisibilityRequest createChangeVisibilityRequest() {
310-
return ChangeMessageVisibilityRequest.builder()
311-
.queueUrl(DEFAULT_QUEUE_URL)
312-
.build();
313-
}
314-
315297
private String getMd5Hash(String message) {
316298
byte[] expectedMd5;
317299
expectedMd5 = Md5Utils.computeMD5Hash(message.getBytes(StandardCharsets.UTF_8));

services/sqs/src/test/java/software/amazon/awssdk/services/sqs/BatchManager/SqsAsyncBatchManagerTest.java

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -78,41 +78,26 @@ public void tearDown() {
7878

7979
@Override
8080
public List<CompletableFuture<SendMessageResponse>> createAndSendSendMessageRequests(String message1, String message2) {
81-
List<SendMessageRequest> requests = new ArrayList<>();
82-
requests.add(createSendMessageRequest(message1));
83-
requests.add(createSendMessageRequest(message2));
84-
8581
List<CompletableFuture<SendMessageResponse>> responses = new ArrayList<>();
86-
for (SendMessageRequest request : requests) {
87-
responses.add(batchManager.sendMessage(request));
88-
}
82+
responses.add(batchManager.sendMessage(builder -> builder.queueUrl(DEFAULT_QUEUE_URL).messageBody(message1)));
83+
responses.add(batchManager.sendMessage(builder -> builder.queueUrl(DEFAULT_QUEUE_URL).messageBody(message2)));
8984
return responses;
9085
}
9186

9287
@Override
9388
public List<CompletableFuture<DeleteMessageResponse>> createAndSendDeleteMessageRequests() {
9489
List<DeleteMessageRequest> requests = new ArrayList<>();
95-
requests.add(createDeleteMessageRequest());
96-
requests.add(createDeleteMessageRequest());
9790
List<CompletableFuture<DeleteMessageResponse>> responses = new ArrayList<>();
98-
99-
for (DeleteMessageRequest request : requests) {
100-
responses.add(batchManager.deleteMessage(request));
101-
}
91+
responses.add(batchManager.deleteMessage(builder -> builder.queueUrl(DEFAULT_QUEUE_URL)));
92+
responses.add(batchManager.deleteMessage(builder -> builder.queueUrl(DEFAULT_QUEUE_URL)));
10293
return responses;
10394
}
10495

10596
@Override
10697
public List<CompletableFuture<ChangeMessageVisibilityResponse>> createAndSendChangeVisibilityRequests() {
107-
List<ChangeMessageVisibilityRequest> requests = new ArrayList<>();
108-
requests.add(createChangeVisibilityRequest());
109-
requests.add(createChangeVisibilityRequest());
110-
11198
List<CompletableFuture<ChangeMessageVisibilityResponse>> responses = new ArrayList<>();
112-
for (ChangeMessageVisibilityRequest request : requests) {
113-
responses.add(batchManager.changeMessageVisibility(request));
114-
}
115-
99+
responses.add(batchManager.changeMessageVisibility(builder -> builder.queueUrl(DEFAULT_QUEUE_URL)));
100+
responses.add(batchManager.changeMessageVisibility(builder -> builder.queueUrl(DEFAULT_QUEUE_URL)));
116101
return responses;
117102
}
118103
}

0 commit comments

Comments
 (0)