Skip to content

Commit 21b4a5d

Browse files
committed
Byte Based batching for SendMessage API
1 parent 63779a5 commit 21b4a5d

15 files changed

+155
-148
lines changed

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public final class BatchOverrideConfiguration implements ToCopyableBuilder<Batch
3737
private final Integer maxBatchItems;
3838
private final Integer maxBatchKeys;
3939
private final Integer maxBufferSize;
40-
private final Duration batchSendRequestFrequency;
40+
private final Duration maxBatchOpenDuration;
4141
private final Duration visibilityTimeout;
4242
private final Duration longPollWaitTimeout;
4343
private final Duration minReceiveWaitTime;
@@ -51,8 +51,7 @@ private BatchOverrideConfiguration(Builder builder) {
5151
this.maxBatchItems = Validate.isPositiveOrNull(builder.maxBatchItems, "maxBatchItems");
5252
this.maxBatchKeys = Validate.isPositiveOrNull(builder.maxBatchKeys, "maxBatchKeys");
5353
this.maxBufferSize = Validate.isPositiveOrNull(builder.maxBufferSize, "maxBufferSize");
54-
this.batchSendRequestFrequency = Validate.isPositiveOrNull(builder.batchSendRequestFrequency,
55-
"batchSendRequestFrequency");
54+
this.maxBatchOpenDuration = Validate.isPositiveOrNull(builder.maxBatchOpenDuration, "maxBatchOpenDuration");
5655
this.visibilityTimeout = Validate.isPositiveOrNull(builder.visibilityTimeout, "visibilityTimeout");
5756
this.longPollWaitTimeout = Validate.isPositiveOrNull(builder.longPollWaitTimeout, "longPollWaitTimeout");
5857
this.minReceiveWaitTime = Validate.isPositiveOrNull(builder.minReceiveWaitTime, "minReceiveWaitTime");
@@ -95,8 +94,8 @@ public Integer maxDoneReceiveBatches() {
9594
/**
9695
* @return the optional maximum amount of time that an outgoing call waits to be batched with messages of the same type.
9796
*/
98-
public Duration batchSendRequestFrequency() {
99-
return batchSendRequestFrequency;
97+
public Duration maxBatchOpenDuration() {
98+
return maxBatchOpenDuration;
10099
}
101100

102101
/**
@@ -154,7 +153,7 @@ public Builder toBuilder() {
154153
return new Builder().maxBatchItems(maxBatchItems)
155154
.maxBatchKeys(maxBatchKeys)
156155
.maxBufferSize(maxBufferSize)
157-
.batchSendRequestFrequency(batchSendRequestFrequency)
156+
.maxBatchOpenDuration(maxBatchOpenDuration)
158157
.visibilityTimeout(visibilityTimeout)
159158
.longPollWaitTimeout(longPollWaitTimeout)
160159
.minReceiveWaitTime(minReceiveWaitTime)
@@ -171,7 +170,7 @@ public String toString() {
171170
.add("maxBatchItems", maxBatchItems)
172171
.add("maxBatchKeys", maxBatchKeys)
173172
.add("maxBufferSize", maxBufferSize)
174-
.add("batchSendRequestFrequency", batchSendRequestFrequency)
173+
.add("maxBatchOpenDuration", maxBatchOpenDuration)
175174
.add("visibilityTimeout", visibilityTimeout)
176175
.add("longPollWaitTimeout", longPollWaitTimeout)
177176
.add("minReceiveWaitTime", minReceiveWaitTime)
@@ -204,8 +203,8 @@ public boolean equals(Object o) {
204203
return false;
205204
}
206205

207-
if (batchSendRequestFrequency != null ? !batchSendRequestFrequency.equals(that.batchSendRequestFrequency) :
208-
that.batchSendRequestFrequency != null) {
206+
if (maxBatchOpenDuration != null ? !maxBatchOpenDuration.equals(that.maxBatchOpenDuration) :
207+
that.maxBatchOpenDuration != null) {
209208
return false;
210209
}
211210
if (visibilityTimeout != null ? !visibilityTimeout.equals(that.visibilityTimeout) :
@@ -245,7 +244,7 @@ public int hashCode() {
245244
int result = maxBatchItems != null ? maxBatchItems.hashCode() : 0;
246245
result = 31 * result + (maxBatchKeys != null ? maxBatchKeys.hashCode() : 0);
247246
result = 31 * result + (maxBufferSize != null ? maxBufferSize.hashCode() : 0);
248-
result = 31 * result + (batchSendRequestFrequency != null ? batchSendRequestFrequency.hashCode() : 0);
247+
result = 31 * result + (maxBatchOpenDuration != null ? maxBatchOpenDuration.hashCode() : 0);
249248
result = 31 * result + (visibilityTimeout != null ? visibilityTimeout.hashCode() : 0);
250249
result = 31 * result + (longPollWaitTimeout != null ? longPollWaitTimeout.hashCode() : 0);
251250
result = 31 * result + (minReceiveWaitTime != null ? minReceiveWaitTime.hashCode() : 0);
@@ -262,7 +261,7 @@ public static final class Builder implements CopyableBuilder<Builder, BatchOverr
262261
private Integer maxBatchItems;
263262
private Integer maxBatchKeys;
264263
private Integer maxBufferSize;
265-
private Duration batchSendRequestFrequency;
264+
private Duration maxBatchOpenDuration;
266265
private Duration visibilityTimeout;
267266
private Duration longPollWaitTimeout;
268267
private Duration minReceiveWaitTime;
@@ -316,11 +315,11 @@ public Builder maxBufferSize(Integer maxBufferSize) {
316315
* Define the maximum amount of time that an outgoing call waits for other requests before sending out a
317316
* batch request.
318317
* TODO : Decide if Ms needs to be added to the name in surface API review meeting
319-
* @param batchSendRequestFrequency The new batchSendRequestFrequency value.
318+
* @param maxBatchOpenDuration The new maxBatchOpenDuration value.
320319
* @return This object for method chaining.
321320
*/
322-
public Builder batchSendRequestFrequency(Duration batchSendRequestFrequency) {
323-
this.batchSendRequestFrequency = batchSendRequestFrequency;
321+
public Builder maxBatchOpenDuration(Duration maxBatchOpenDuration) {
322+
this.maxBatchOpenDuration = maxBatchOpenDuration;
324323
return this;
325324
}
326325

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ public final class BatchingExecutionContext<RequestT, ResponseT> {
2525
private final RequestT request;
2626
private final CompletableFuture<ResponseT> response;
2727

28-
private final Optional<Long> responsePayload;
28+
private final Optional<Integer> responsePayloadByteSize;
2929

3030
public BatchingExecutionContext(RequestT request, CompletableFuture<ResponseT> response) {
3131
this.request = request;
3232
this.response = response;
33-
responsePayload = RequestPayloadCalculator.calculateMessageSize(request);
33+
responsePayloadByteSize = RequestPayloadCalculator.calculateMessageSize(request);
3434
}
3535

3636
public RequestT request() {
@@ -42,7 +42,10 @@ public CompletableFuture<ResponseT> response() {
4242
}
4343

4444

45-
public Optional<Long> responsePayload() {
46-
return responsePayload;
45+
/**
46+
* Optional because responsePayloadByteSize is required only for SendMessageRequests and not for other requests.
47+
*/
48+
public Optional<Integer> responsePayloadByteSize() {
49+
return responsePayloadByteSize;
4750
}
4851
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,16 @@ public final class BatchingMap<RequestT, ResponseT> {
3434
private final int maxBatchKeys;
3535
private final int maxBatchBytesSize;
3636
private final int maxBatchSize;
37+
private final int maxBufferSize;
3738
private final Map<String, RequestBatchBuffer<RequestT, ResponseT>> batchContextMap;
39+
3840

39-
public BatchingMap(int maxBatchKeys,
40-
int maxBatchBytesSize,
41-
int maxBatchSize) {
41+
public BatchingMap(RequestBatchConfiguration overrideConfiguration) {
4242
this.batchContextMap = new ConcurrentHashMap<>();
43-
this.maxBatchKeys = maxBatchKeys;
44-
this.maxBatchBytesSize = maxBatchBytesSize;
45-
this.maxBatchSize = maxBatchSize;
43+
this.maxBatchKeys = overrideConfiguration.maxBatchKeys();
44+
this.maxBatchBytesSize = overrideConfiguration.maxBatchBytesSize();
45+
this.maxBatchSize = overrideConfiguration.maxBatchItems();
46+
this.maxBufferSize = overrideConfiguration.maxBufferSize();
4647
}
4748

4849
public void put(String batchKey, Supplier<ScheduledFuture<?>> scheduleFlush, RequestT request,
@@ -51,7 +52,7 @@ public void put(String batchKey, Supplier<ScheduledFuture<?>> scheduleFlush, Req
5152
if (batchContextMap.size() == maxBatchKeys) {
5253
throw new IllegalStateException("Reached MaxBatchKeys of: " + maxBatchKeys);
5354
}
54-
return new RequestBatchBuffer<>(scheduleFlush.get(), maxBatchSize, maxBatchBytesSize);
55+
return new RequestBatchBuffer<>(scheduleFlush.get(), maxBatchSize, maxBatchBytesSize, maxBufferSize);
5556
}).put(request, response);
5657
}
5758

@@ -67,12 +68,13 @@ public void forEach(BiConsumer<String, RequestBatchBuffer<RequestT, ResponseT>>
6768
batchContextMap.forEach(action);
6869
}
6970

70-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(String batchKey, RequestT request) {
71-
return batchContextMap.get(batchKey).flushableRequests(request);
71+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequestsOnByteLimitBeforeAdd(String batchKey,
72+
RequestT request) {
73+
return batchContextMap.get(batchKey).flushableRequestsOnByteLimitBeforeAdd(request);
7274
}
7375

7476
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(String batchKey) {
75-
return batchContextMap.get(batchKey).flushableRequests(null);
77+
return batchContextMap.get(batchKey).flushableRequests();
7678
}
7779

7880

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -48,27 +48,23 @@ public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {
4848

4949
private DefaultSqsAsyncBatchManager(DefaultBuilder builder) {
5050
this.client = Validate.notNull(builder.client, "client cannot be null");
51-
5251
ScheduledExecutorService scheduledExecutor = builder.scheduledExecutor;
53-
54-
RequestBatchConfiguration.Builder configBuilder =
55-
builder.overrideConfiguration != null ?
56-
RequestBatchConfiguration.builder()
57-
.batchSendRequestFrequency(builder.overrideConfiguration.batchSendRequestFrequency())
58-
.maxBatchItems(builder.overrideConfiguration.maxBatchItems())
59-
.maxBufferSize(builder.overrideConfiguration.maxBufferSize())
60-
.maxBatchKeys(builder.overrideConfiguration.maxBatchKeys())
61-
: RequestBatchConfiguration.builder();
62-
63-
this.sendMessageBatchManager = new SendMessageBatchManager(configBuilder
52+
this.sendMessageBatchManager = new SendMessageBatchManager(RequestBatchConfiguration
53+
.builder(builder.overrideConfiguration)
6454
.maxBatchBytesSize(MAX_PAYLOAD_SIZE_BYTES)
6555
.build(),
6656
scheduledExecutor,
6757
client);
68-
this.deleteMessageBatchManager = new DeleteMessageBatchManager(configBuilder.build(),
58+
this.deleteMessageBatchManager = new DeleteMessageBatchManager(RequestBatchConfiguration
59+
.builder(builder.overrideConfiguration)
60+
.build(),
6961
scheduledExecutor,
7062
client);
71-
this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(configBuilder.build(),
63+
this.changeMessageVisibilityBatchManager =
64+
new ChangeMessageVisibilityBatchManager(RequestBatchConfiguration
65+
.builder(builder.overrideConfiguration)
66+
// .maxBatchBytesSize(MAX_PAYLOAD_SIZE_BYTES)
67+
.build(),
7268
scheduledExecutor,
7369
client);
7470

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
import java.util.ArrayList;
1919
import java.util.List;
20-
import java.util.Map;
2120
import java.util.Optional;
2221
import java.util.concurrent.CompletableFuture;
2322
import java.util.concurrent.ScheduledExecutorService;
@@ -36,7 +35,6 @@
3635
import software.amazon.awssdk.services.sqs.model.SqsException;
3736
import software.amazon.awssdk.utils.Either;
3837

39-
4038
@SdkInternalApi
4139
public class DeleteMessageBatchManager extends RequestBatchManager<DeleteMessageRequest, DeleteMessageResponse,
4240
DeleteMessageBatchResponse> {
@@ -50,17 +48,6 @@ protected DeleteMessageBatchManager(RequestBatchConfiguration overrideConfigurat
5048
this.sqsAsyncClient = sqsAsyncClient;
5149
}
5250

53-
54-
private static boolean shouldFlush(Map<String, BatchingExecutionContext<DeleteMessageRequest,
55-
DeleteMessageResponse>> contextMap,
56-
DeleteMessageRequest request, RequestBatchConfiguration configuration) {
57-
if (request != null) {
58-
return false;
59-
}
60-
return contextMap.size() >= configuration.maxBatchItems();
61-
}
62-
63-
6451
private static DeleteMessageBatchRequest createDeleteMessageBatchRequest(
6552
List<IdentifiableMessage<DeleteMessageRequest>> identifiedRequests, String batchKey) {
6653
List<DeleteMessageBatchRequestEntry> entries = identifiedRequests

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,25 @@
1515

1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

18-
import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_SEND_MESSAGE_BATCH_SIZE;
1918

2019
import java.util.Collection;
20+
import java.util.Collections;
2121
import java.util.LinkedHashMap;
2222
import java.util.Map;
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.ConcurrentHashMap;
2525
import java.util.concurrent.ScheduledFuture;
2626
import java.util.stream.Collectors;
2727
import software.amazon.awssdk.annotations.SdkInternalApi;
28+
import software.amazon.awssdk.utils.CollectionUtils;
2829

2930
@SdkInternalApi
3031
public final class RequestBatchBuffer<RequestT, ResponseT> {
3132
private final Object flushLock = new Object();
3233

3334
private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;
3435
private final int maxBatchItems;
36+
private final int maxBufferSize;
3537
private final int maxBatchSizeInBytes;
3638
/**
3739
* Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next
@@ -52,47 +54,57 @@ public final class RequestBatchBuffer<RequestT, ResponseT> {
5254
private ScheduledFuture<?> scheduledFlush;
5355

5456
public RequestBatchBuffer(ScheduledFuture<?> scheduledFlush,
55-
int maxBatchItems, int maxBatchSizeInBytes) {
57+
int maxBatchItems, int maxBatchSizeInBytes, int maxBufferSize) {
5658
this.idToBatchContext = new ConcurrentHashMap<>();
5759
this.nextId = 0;
5860
this.nextBatchEntry = 0;
5961
this.scheduledFlush = scheduledFlush;
6062
this.maxBatchItems = maxBatchItems;
63+
this.maxBufferSize = maxBufferSize;
6164
this.maxBatchSizeInBytes = maxBatchSizeInBytes;
6265
}
6366

6467
/**
6568
* When request is null it checks if current contents in idToBatchContext are flushable. When request is passed it calculate
6669
* if the new request can overflow of ByteSize if yes then it flushes the messages
6770
*/
68-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(RequestT request) {
71+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests() {
6972
synchronized (flushLock) {
70-
if (isByteSizeThresholdCrossed(request) || isMaxBatchSizeLimitReached(request)) {
73+
if (isByteSizeThresholdCrossed(0) || isMaxBatchSizeLimitReached()) {
7174
return extractFlushedEntries(maxBatchItems);
7275
}
7376

74-
return new ConcurrentHashMap<>();
77+
return Collections.emptyMap();
7578
}
7679
}
7780

78-
private boolean isMaxBatchSizeLimitReached(RequestT request) {
79-
int batchSizeLimit = request != null ? this.maxBatchItems + 1 : this.maxBatchItems;
80-
return idToBatchContext.size() >= batchSizeLimit;
81+
private boolean isMaxBatchSizeLimitReached() {
82+
return idToBatchContext.size() >= this.maxBatchItems;
8183
}
8284

83-
private boolean isByteSizeThresholdCrossed(RequestT request) {
85+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequestsOnByteLimitBeforeAdd(RequestT request) {
86+
synchronized (flushLock) {
87+
if (maxBatchSizeInBytes > 0 && CollectionUtils.isNotEmpty(idToBatchContext)) {
88+
int incomingRequestBytes = RequestPayloadCalculator.calculateMessageSize(request).orElse(0);
89+
if (isByteSizeThresholdCrossed(incomingRequestBytes)) {
90+
return extractFlushedEntries(maxBatchItems);
91+
}
92+
}
93+
return Collections.emptyMap();
94+
}
95+
}
96+
97+
private boolean isByteSizeThresholdCrossed(int incomingRequestBytes) {
8498
if (maxBatchSizeInBytes < 0) {
8599
return false;
86100
}
87-
long incomingRequestBytes = RequestPayloadCalculator.calculateMessageSize(request).orElse(0L);
88-
long totalPayloadSize = idToBatchContext.values().stream()
89-
.map(BatchingExecutionContext::responsePayload)
90-
.mapToLong(opt -> opt.orElse(0L))
91-
.sum() + incomingRequestBytes;
101+
int totalPayloadSize = idToBatchContext.values().stream()
102+
.map(BatchingExecutionContext::responsePayloadByteSize)
103+
.mapToInt(opt -> opt.orElse(0))
104+
.sum() + incomingRequestBytes;
92105
return totalPayloadSize > maxBatchSizeInBytes;
93106
}
94107

95-
96108
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(int maxBatchItems) {
97109
synchronized (flushLock) {
98110
if (!idToBatchContext.isEmpty()) {
@@ -115,8 +127,8 @@ private Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractFlushe
115127

116128
public void put(RequestT request, CompletableFuture<ResponseT> response) {
117129
synchronized (this) {
118-
if (idToBatchContext.size() == MAX_SEND_MESSAGE_BATCH_SIZE) {
119-
throw new IllegalStateException("Reached MaxBufferSize of: " + MAX_SEND_MESSAGE_BATCH_SIZE);
130+
if (idToBatchContext.size() == maxBufferSize) {
131+
throw new IllegalStateException("Reached MaxBufferSize of: " + maxBufferSize);
120132
}
121133

122134
if (nextId == Integer.MAX_VALUE) {

0 commit comments

Comments
 (0)