Skip to content

Commit 63779a5

Browse files
committed
Byte Based batching for SendMessage API
1 parent 3f8a065 commit 63779a5

16 files changed

+388
-238
lines changed

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ private BatchOverrideConfiguration(Builder builder) {
5151
this.maxBatchItems = Validate.isPositiveOrNull(builder.maxBatchItems, "maxBatchItems");
5252
this.maxBatchKeys = Validate.isPositiveOrNull(builder.maxBatchKeys, "maxBatchKeys");
5353
this.maxBufferSize = Validate.isPositiveOrNull(builder.maxBufferSize, "maxBufferSize");
54-
this.batchSendRequestFrequency = Validate.isPositiveOrNull(builder.batchSendRequestFrequency, "batchSendRequestFrequency");
54+
this.batchSendRequestFrequency = Validate.isPositiveOrNull(builder.batchSendRequestFrequency,
55+
"batchSendRequestFrequency");
5556
this.visibilityTimeout = Validate.isPositiveOrNull(builder.visibilityTimeout, "visibilityTimeout");
5657
this.longPollWaitTimeout = Validate.isPositiveOrNull(builder.longPollWaitTimeout, "longPollWaitTimeout");
5758
this.minReceiveWaitTime = Validate.isPositiveOrNull(builder.minReceiveWaitTime, "minReceiveWaitTime");

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

18+
import java.util.Optional;
1819
import java.util.concurrent.CompletableFuture;
1920
import software.amazon.awssdk.annotations.SdkInternalApi;
2021

@@ -24,12 +25,12 @@ public final class BatchingExecutionContext<RequestT, ResponseT> {
2425
private final RequestT request;
2526
private final CompletableFuture<ResponseT> response;
2627

27-
private final long responsePayload;
28+
private final Optional<Long> responsePayload;
2829

2930
public BatchingExecutionContext(RequestT request, CompletableFuture<ResponseT> response) {
3031
this.request = request;
3132
this.response = response;
32-
responsePayload = ResponsePayloadCalculator.calculateMessageSize(request);
33+
responsePayload = RequestPayloadCalculator.calculateMessageSize(request);
3334
}
3435

3536
public RequestT request() {
@@ -41,7 +42,7 @@ public CompletableFuture<ResponseT> response() {
4142
}
4243

4344

44-
public long responsePayload() {
45+
public Optional<Long> responsePayload() {
4546
return responsePayload;
4647
}
4748
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import java.util.concurrent.ConcurrentHashMap;
2121
import java.util.concurrent.ScheduledFuture;
2222
import java.util.function.BiConsumer;
23-
import java.util.function.BiPredicate;
24-
import java.util.function.Predicate;
2523
import java.util.function.Supplier;
2624
import software.amazon.awssdk.annotations.SdkInternalApi;
2725

@@ -34,15 +32,17 @@
3432
public final class BatchingMap<RequestT, ResponseT> {
3533

3634
private final int maxBatchKeys;
35+
private final int maxBatchBytesSize;
36+
private final int maxBatchSize;
3737
private final Map<String, RequestBatchBuffer<RequestT, ResponseT>> batchContextMap;
38-
private final BiPredicate<Map<String, BatchingExecutionContext<RequestT, ResponseT>>, RequestT> flushCondition ;
39-
4038

4139
public BatchingMap(int maxBatchKeys,
42-
BiPredicate<Map<String, BatchingExecutionContext<RequestT, ResponseT>>, RequestT> flushCondition) {
40+
int maxBatchBytesSize,
41+
int maxBatchSize) {
4342
this.batchContextMap = new ConcurrentHashMap<>();
4443
this.maxBatchKeys = maxBatchKeys;
45-
this.flushCondition = flushCondition;
44+
this.maxBatchBytesSize = maxBatchBytesSize;
45+
this.maxBatchSize = maxBatchSize;
4646
}
4747

4848
public void put(String batchKey, Supplier<ScheduledFuture<?>> scheduleFlush, RequestT request,
@@ -51,10 +51,14 @@ public void put(String batchKey, Supplier<ScheduledFuture<?>> scheduleFlush, Req
5151
if (batchContextMap.size() == maxBatchKeys) {
5252
throw new IllegalStateException("Reached MaxBatchKeys of: " + maxBatchKeys);
5353
}
54-
return new RequestBatchBuffer<>(scheduleFlush.get(), flushCondition);
54+
return new RequestBatchBuffer<>(scheduleFlush.get(), maxBatchSize, maxBatchBytesSize);
5555
}).put(request, response);
5656
}
5757

58+
public boolean contains(String batchKey) {
59+
return batchContextMap.containsKey(batchKey);
60+
}
61+
5862
public void putScheduledFlush(String batchKey, ScheduledFuture<?> scheduledFlush) {
5963
batchContextMap.get(batchKey).putScheduledFlush(scheduledFlush);
6064
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java

Lines changed: 7 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,14 @@
1717

1818
import java.util.ArrayList;
1919
import java.util.List;
20-
import java.util.Map;
2120
import java.util.Optional;
2221
import java.util.concurrent.CompletableFuture;
2322
import java.util.concurrent.ScheduledExecutorService;
24-
import java.util.function.BiPredicate;
2523
import java.util.stream.Collectors;
2624
import software.amazon.awssdk.annotations.SdkInternalApi;
2725
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
2826
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
2927
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
30-
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
3128
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
3229
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
3330
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
@@ -49,22 +46,10 @@ public class ChangeMessageVisibilityBatchManager extends RequestBatchManager<Cha
4946
protected ChangeMessageVisibilityBatchManager(RequestBatchConfiguration overrideConfiguration,
5047
ScheduledExecutorService scheduledExecutor,
5148
SqsAsyncClient sqsAsyncClient) {
52-
super(overrideConfiguration,
53-
scheduledExecutor, (stringBatchingExecutionContextMap, changeMessageVisibilityRequest)
54-
-> shouldFlush(stringBatchingExecutionContextMap, changeMessageVisibilityRequest, overrideConfiguration)
55-
);
49+
super(overrideConfiguration, scheduledExecutor);
5650
this.sqsAsyncClient = sqsAsyncClient;
5751
}
5852

59-
private static boolean shouldFlush(Map<String, BatchingExecutionContext<ChangeMessageVisibilityRequest,
60-
ChangeMessageVisibilityResponse>> contextMap,
61-
ChangeMessageVisibilityRequest request, RequestBatchConfiguration configuration) {
62-
if (request != null) {
63-
return false;
64-
}
65-
return contextMap.size() >= configuration.maxBatchItems();
66-
}
67-
6853
private static ChangeMessageVisibilityBatchRequest createChangeMessageVisibilityBatchRequest(
6954
List<IdentifiableMessage<ChangeMessageVisibilityRequest>> identifiedRequests, String batchKey) {
7055
List<ChangeMessageVisibilityBatchRequestEntry> entries = identifiedRequests
@@ -79,10 +64,10 @@ private static ChangeMessageVisibilityBatchRequest createChangeMessageVisibility
7964
.overrideConfiguration();
8065
return overrideConfiguration.map(
8166
config -> ChangeMessageVisibilityBatchRequest.builder()
82-
.queueUrl(batchKey)
83-
.overrideConfiguration(config)
84-
.entries(entries)
85-
.build())
67+
.queueUrl(batchKey)
68+
.overrideConfiguration(config)
69+
.entries(entries)
70+
.build())
8671
.orElseGet(() -> ChangeMessageVisibilityBatchRequest.builder()
8772
.queueUrl(batchKey)
8873
.entries(entries)
@@ -119,7 +104,6 @@ private static IdentifiableMessage<Throwable> changeMessageVisibilityCreateThrow
119104
}
120105

121106

122-
123107
@Override
124108
protected CompletableFuture<ChangeMessageVisibilityBatchResponse> batchAndSend(
125109
List<IdentifiableMessage<ChangeMessageVisibilityRequest>> identifiedRequests, String batchKey) {
@@ -130,8 +114,8 @@ protected CompletableFuture<ChangeMessageVisibilityBatchResponse> batchAndSend(
130114

131115
@Override
132116
protected String getBatchKey(ChangeMessageVisibilityRequest request) {
133-
return request.overrideConfiguration().map(overrideConfig -> request.queueUrl() + overrideConfig.hashCode())
134-
.orElseGet(request::queueUrl);
117+
return request.overrideConfiguration().map(overrideConfig -> request.queueUrl() + overrideConfig.hashCode())
118+
.orElseGet(request::queueUrl);
135119
}
136120

137121
@Override

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

18+
import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_PAYLOAD_SIZE_BYTES;
19+
1820
import java.util.concurrent.CompletableFuture;
1921
import java.util.concurrent.ScheduledExecutorService;
2022
import software.amazon.awssdk.annotations.SdkInternalApi;
@@ -49,13 +51,24 @@ private DefaultSqsAsyncBatchManager(DefaultBuilder builder) {
4951

5052
ScheduledExecutorService scheduledExecutor = builder.scheduledExecutor;
5153

52-
this.sendMessageBatchManager = new SendMessageBatchManager(new RequestBatchConfiguration(builder.overrideConfiguration),
54+
RequestBatchConfiguration.Builder configBuilder =
55+
builder.overrideConfiguration != null ?
56+
RequestBatchConfiguration.builder()
57+
.batchSendRequestFrequency(builder.overrideConfiguration.batchSendRequestFrequency())
58+
.maxBatchItems(builder.overrideConfiguration.maxBatchItems())
59+
.maxBufferSize(builder.overrideConfiguration.maxBufferSize())
60+
.maxBatchKeys(builder.overrideConfiguration.maxBatchKeys())
61+
: RequestBatchConfiguration.builder();
62+
63+
this.sendMessageBatchManager = new SendMessageBatchManager(configBuilder
64+
.maxBatchBytesSize(MAX_PAYLOAD_SIZE_BYTES)
65+
.build(),
5366
scheduledExecutor,
5467
client);
55-
this.deleteMessageBatchManager = new DeleteMessageBatchManager(new RequestBatchConfiguration(builder.overrideConfiguration),
68+
this.deleteMessageBatchManager = new DeleteMessageBatchManager(configBuilder.build(),
5669
scheduledExecutor,
5770
client);
58-
this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(new RequestBatchConfiguration(builder.overrideConfiguration),
71+
this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(configBuilder.build(),
5972
scheduledExecutor,
6073
client);
6174

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,6 @@
2727
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
2828
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2929
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
30-
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
31-
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse;
3230
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
3331
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
3432
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
@@ -48,9 +46,7 @@ public class DeleteMessageBatchManager extends RequestBatchManager<DeleteMessage
4846
protected DeleteMessageBatchManager(RequestBatchConfiguration overrideConfiguration,
4947
ScheduledExecutorService scheduledExecutor,
5048
SqsAsyncClient sqsAsyncClient) {
51-
super(overrideConfiguration, scheduledExecutor, (stringBatchingExecutionContextMap, changeMessageVisibilityRequest)
52-
-> shouldFlush(stringBatchingExecutionContextMap, changeMessageVisibilityRequest, overrideConfiguration)
53-
);
49+
super(overrideConfiguration, scheduledExecutor);
5450
this.sqsAsyncClient = sqsAsyncClient;
5551
}
5652

@@ -65,7 +61,6 @@ private static boolean shouldFlush(Map<String, BatchingExecutionContext<DeleteMe
6561
}
6662

6763

68-
6964
private static DeleteMessageBatchRequest createDeleteMessageBatchRequest(
7065
List<IdentifiableMessage<DeleteMessageRequest>> identifiedRequests, String batchKey) {
7166
List<DeleteMessageBatchRequestEntry> entries = identifiedRequests

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,32 +23,26 @@
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.ConcurrentHashMap;
2525
import java.util.concurrent.ScheduledFuture;
26-
import java.util.function.BiPredicate;
27-
import java.util.function.Predicate;
2826
import java.util.stream.Collectors;
2927
import software.amazon.awssdk.annotations.SdkInternalApi;
3028

3129
@SdkInternalApi
32-
public final class RequestBatchBuffer<RequestT, ResponseT> {
30+
public final class RequestBatchBuffer<RequestT, ResponseT> {
3331
private final Object flushLock = new Object();
3432

3533
private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;
36-
37-
38-
private BiPredicate<Map<String, BatchingExecutionContext<RequestT, ResponseT>>, RequestT> flushCondition ;
39-
34+
private final int maxBatchItems;
35+
private final int maxBatchSizeInBytes;
4036
/**
4137
* Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next
4238
* BatchingExecutionContext. For simplicity, the ID is just an integer that is incremented everytime a new request and
4339
* response pair is received.
4440
*/
4541
private int nextId;
46-
private int maxBatchItems;
47-
4842
/**
49-
* Keeps track of the ID of the next entry to be added in a batch request. This ID does not necessarily correlate to a
50-
* request that already exists in the idToBatchContext map since it refers to the next entry (ex. if the last entry added
51-
* to idToBatchContext had an id of 22, nextBatchEntry will have a value of 23).
43+
* Keeps track of the ID of the next entry to be added in a batch request. This ID does not necessarily correlate to a request
44+
* that already exists in the idToBatchContext map since it refers to the next entry (ex. if the last entry added to
45+
* idToBatchContext had an id of 22, nextBatchEntry will have a value of 23).
5246
*/
5347
private int nextBatchEntry;
5448

@@ -58,27 +52,50 @@ public final class RequestBatchBuffer<RequestT, ResponseT> {
5852
private ScheduledFuture<?> scheduledFlush;
5953

6054
public RequestBatchBuffer(ScheduledFuture<?> scheduledFlush,
61-
BiPredicate<Map<String, BatchingExecutionContext<RequestT, ResponseT>>, RequestT> flushCondition) {
55+
int maxBatchItems, int maxBatchSizeInBytes) {
6256
this.idToBatchContext = new ConcurrentHashMap<>();
6357
this.nextId = 0;
6458
this.nextBatchEntry = 0;
6559
this.scheduledFlush = scheduledFlush;
66-
this.flushCondition = flushCondition;
60+
this.maxBatchItems = maxBatchItems;
61+
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
6762
}
6863

64+
/**
65+
* When request is null it checks if current contents in idToBatchContext are flushable. When request is passed it calculate
66+
* if the new request can overflow of ByteSize if yes then it flushes the messages
67+
*/
6968
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(RequestT request) {
7069
synchronized (flushLock) {
71-
if (flushCondition.test(idToBatchContext, request)) {
70+
if (isByteSizeThresholdCrossed(request) || isMaxBatchSizeLimitReached(request)) {
7271
return extractFlushedEntries(maxBatchItems);
7372
}
73+
7474
return new ConcurrentHashMap<>();
7575
}
7676
}
7777

78+
private boolean isMaxBatchSizeLimitReached(RequestT request) {
79+
int batchSizeLimit = request != null ? this.maxBatchItems + 1 : this.maxBatchItems;
80+
return idToBatchContext.size() >= batchSizeLimit;
81+
}
82+
83+
private boolean isByteSizeThresholdCrossed(RequestT request) {
84+
if (maxBatchSizeInBytes < 0) {
85+
return false;
86+
}
87+
long incomingRequestBytes = RequestPayloadCalculator.calculateMessageSize(request).orElse(0L);
88+
long totalPayloadSize = idToBatchContext.values().stream()
89+
.map(BatchingExecutionContext::responsePayload)
90+
.mapToLong(opt -> opt.orElse(0L))
91+
.sum() + incomingRequestBytes;
92+
return totalPayloadSize > maxBatchSizeInBytes;
93+
}
94+
7895

7996
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(int maxBatchItems) {
8097
synchronized (flushLock) {
81-
if (idToBatchContext.size() > 0) {
98+
if (!idToBatchContext.isEmpty()) {
8299
return extractFlushedEntries(maxBatchItems);
83100
}
84101
return new ConcurrentHashMap<>();

0 commit comments

Comments
 (0)