Skip to content

Commit 3f8a065

Browse files
committed
Initial changes 2
1 parent 2d8966f commit 3f8a065

12 files changed

+259
-46
lines changed

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingExecutionContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,12 @@ public final class BatchingExecutionContext<RequestT, ResponseT> {
2424
private final RequestT request;
2525
private final CompletableFuture<ResponseT> response;
2626

27+
private final long responsePayload;
28+
2729
public BatchingExecutionContext(RequestT request, CompletableFuture<ResponseT> response) {
2830
this.request = request;
2931
this.response = response;
32+
responsePayload = ResponsePayloadCalculator.calculateMessageSize(request);
3033
}
3134

3235
public RequestT request() {
@@ -36,4 +39,9 @@ public RequestT request() {
3639
public CompletableFuture<ResponseT> response() {
3740
return response;
3841
}
42+
43+
44+
public long responsePayload() {
45+
return responsePayload;
46+
}
3947
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import java.util.concurrent.ConcurrentHashMap;
2121
import java.util.concurrent.ScheduledFuture;
2222
import java.util.function.BiConsumer;
23+
import java.util.function.BiPredicate;
24+
import java.util.function.Predicate;
2325
import java.util.function.Supplier;
2426
import software.amazon.awssdk.annotations.SdkInternalApi;
2527

@@ -32,13 +34,15 @@
3234
public final class BatchingMap<RequestT, ResponseT> {
3335

3436
private final int maxBatchKeys;
35-
private final int maxBufferSize;
3637
private final Map<String, RequestBatchBuffer<RequestT, ResponseT>> batchContextMap;
38+
private final BiPredicate<Map<String, BatchingExecutionContext<RequestT, ResponseT>>, RequestT> flushCondition ;
3739

38-
public BatchingMap(int maxBatchKeys, int maxBufferSize) {
40+
41+
public BatchingMap(int maxBatchKeys,
42+
BiPredicate<Map<String, BatchingExecutionContext<RequestT, ResponseT>>, RequestT> flushCondition) {
3943
this.batchContextMap = new ConcurrentHashMap<>();
4044
this.maxBatchKeys = maxBatchKeys;
41-
this.maxBufferSize = maxBufferSize;
45+
this.flushCondition = flushCondition;
4246
}
4347

4448
public void put(String batchKey, Supplier<ScheduledFuture<?>> scheduleFlush, RequestT request,
@@ -47,7 +51,7 @@ public void put(String batchKey, Supplier<ScheduledFuture<?>> scheduleFlush, Req
4751
if (batchContextMap.size() == maxBatchKeys) {
4852
throw new IllegalStateException("Reached MaxBatchKeys of: " + maxBatchKeys);
4953
}
50-
return new RequestBatchBuffer<>(maxBufferSize, scheduleFlush.get());
54+
return new RequestBatchBuffer<>(scheduleFlush.get(), flushCondition);
5155
}).put(request, response);
5256
}
5357

@@ -59,11 +63,15 @@ public void forEach(BiConsumer<String, RequestBatchBuffer<RequestT, ResponseT>>
5963
batchContextMap.forEach(action);
6064
}
6165

62-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(String batchKey,
63-
int maxBatchItems) {
64-
return batchContextMap.get(batchKey).flushableRequests(maxBatchItems);
66+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(String batchKey, RequestT request) {
67+
return batchContextMap.get(batchKey).flushableRequests(request);
68+
}
69+
70+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(String batchKey) {
71+
return batchContextMap.get(batchKey).flushableRequests(null);
6572
}
6673

74+
6775
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(String batchKey,
6876
int maxBatchItems) {
6977
return batchContextMap.get(batchKey).flushableScheduledRequests(maxBatchItems);

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ChangeMessageVisibilityBatchManager.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
import java.util.ArrayList;
1919
import java.util.List;
20+
import java.util.Map;
2021
import java.util.Optional;
2122
import java.util.concurrent.CompletableFuture;
2223
import java.util.concurrent.ScheduledExecutorService;
24+
import java.util.function.BiPredicate;
2325
import java.util.stream.Collectors;
2426
import software.amazon.awssdk.annotations.SdkInternalApi;
2527
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
@@ -43,13 +45,26 @@ public class ChangeMessageVisibilityBatchManager extends RequestBatchManager<Cha
4345

4446
private final SqsAsyncClient sqsAsyncClient;
4547

46-
protected ChangeMessageVisibilityBatchManager(BatchOverrideConfiguration overrideConfiguration,
48+
49+
protected ChangeMessageVisibilityBatchManager(RequestBatchConfiguration overrideConfiguration,
4750
ScheduledExecutorService scheduledExecutor,
4851
SqsAsyncClient sqsAsyncClient) {
49-
super(overrideConfiguration, scheduledExecutor);
52+
super(overrideConfiguration,
53+
scheduledExecutor, (stringBatchingExecutionContextMap, changeMessageVisibilityRequest)
54+
-> shouldFlush(stringBatchingExecutionContextMap, changeMessageVisibilityRequest, overrideConfiguration)
55+
);
5056
this.sqsAsyncClient = sqsAsyncClient;
5157
}
5258

59+
private static boolean shouldFlush(Map<String, BatchingExecutionContext<ChangeMessageVisibilityRequest,
60+
ChangeMessageVisibilityResponse>> contextMap,
61+
ChangeMessageVisibilityRequest request, RequestBatchConfiguration configuration) {
62+
if (request != null) {
63+
return false;
64+
}
65+
return contextMap.size() >= configuration.maxBatchItems();
66+
}
67+
5368
private static ChangeMessageVisibilityBatchRequest createChangeMessageVisibilityBatchRequest(
5469
List<IdentifiableMessage<ChangeMessageVisibilityRequest>> identifiedRequests, String batchKey) {
5570
List<ChangeMessageVisibilityBatchRequestEntry> entries = identifiedRequests

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@ private DefaultSqsAsyncBatchManager(DefaultBuilder builder) {
4949

5050
ScheduledExecutorService scheduledExecutor = builder.scheduledExecutor;
5151

52-
this.sendMessageBatchManager = new SendMessageBatchManager(builder.overrideConfiguration,
52+
this.sendMessageBatchManager = new SendMessageBatchManager(new RequestBatchConfiguration(builder.overrideConfiguration),
5353
scheduledExecutor,
5454
client);
55-
this.deleteMessageBatchManager = new DeleteMessageBatchManager(builder.overrideConfiguration,
55+
this.deleteMessageBatchManager = new DeleteMessageBatchManager(new RequestBatchConfiguration(builder.overrideConfiguration),
5656
scheduledExecutor,
5757
client);
58-
this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(builder.overrideConfiguration,
58+
this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(new RequestBatchConfiguration(builder.overrideConfiguration),
5959
scheduledExecutor,
6060
client);
6161

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DeleteMessageBatchManager.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import java.util.ArrayList;
1919
import java.util.List;
20+
import java.util.Map;
2021
import java.util.Optional;
2122
import java.util.concurrent.CompletableFuture;
2223
import java.util.concurrent.ScheduledExecutorService;
@@ -25,8 +26,9 @@
2526
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
2627
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
2728
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
28-
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
2929
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
30+
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
31+
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse;
3032
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
3133
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
3234
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
@@ -43,13 +45,27 @@ public class DeleteMessageBatchManager extends RequestBatchManager<DeleteMessage
4345

4446
private final SqsAsyncClient sqsAsyncClient;
4547

46-
protected DeleteMessageBatchManager(BatchOverrideConfiguration overrideConfiguration,
48+
protected DeleteMessageBatchManager(RequestBatchConfiguration overrideConfiguration,
4749
ScheduledExecutorService scheduledExecutor,
4850
SqsAsyncClient sqsAsyncClient) {
49-
super(overrideConfiguration, scheduledExecutor);
51+
super(overrideConfiguration, scheduledExecutor, (stringBatchingExecutionContextMap, changeMessageVisibilityRequest)
52+
-> shouldFlush(stringBatchingExecutionContextMap, changeMessageVisibilityRequest, overrideConfiguration)
53+
);
5054
this.sqsAsyncClient = sqsAsyncClient;
5155
}
5256

57+
58+
private static boolean shouldFlush(Map<String, BatchingExecutionContext<DeleteMessageRequest,
59+
DeleteMessageResponse>> contextMap,
60+
DeleteMessageRequest request, RequestBatchConfiguration configuration) {
61+
if (request != null) {
62+
return false;
63+
}
64+
return contextMap.size() >= configuration.maxBatchItems();
65+
}
66+
67+
68+
5369
private static DeleteMessageBatchRequest createDeleteMessageBatchRequest(
5470
List<IdentifiableMessage<DeleteMessageRequest>> identifiedRequests, String batchKey) {
5571
List<DeleteMessageBatchRequestEntry> entries = identifiedRequests

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,16 @@
1515

1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

18+
import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_SEND_MESSAGE_BATCH_SIZE;
19+
1820
import java.util.Collection;
1921
import java.util.LinkedHashMap;
2022
import java.util.Map;
2123
import java.util.concurrent.CompletableFuture;
2224
import java.util.concurrent.ConcurrentHashMap;
2325
import java.util.concurrent.ScheduledFuture;
26+
import java.util.function.BiPredicate;
27+
import java.util.function.Predicate;
2428
import java.util.stream.Collectors;
2529
import software.amazon.awssdk.annotations.SdkInternalApi;
2630

@@ -30,17 +34,16 @@ public final class RequestBatchBuffer<RequestT, ResponseT> {
3034

3135
private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;
3236

33-
/**
34-
* Maximum number of elements that can be included in the BatchBuffer.
35-
*/
36-
private final int maxBufferSize;
37+
38+
private BiPredicate<Map<String, BatchingExecutionContext<RequestT, ResponseT>>, RequestT> flushCondition ;
3739

3840
/**
3941
* Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next
4042
* BatchingExecutionContext. For simplicity, the ID is just an integer that is incremented everytime a new request and
4143
* response pair is received.
4244
*/
4345
private int nextId;
46+
private int maxBatchItems;
4447

4548
/**
4649
* Keeps track of the ID of the next entry to be added in a batch request. This ID does not necessarily correlate to a
@@ -54,23 +57,25 @@ public final class RequestBatchBuffer<RequestT, ResponseT> {
5457
*/
5558
private ScheduledFuture<?> scheduledFlush;
5659

57-
public RequestBatchBuffer(int maxBufferSize, ScheduledFuture<?> scheduledFlush) {
60+
public RequestBatchBuffer(ScheduledFuture<?> scheduledFlush,
61+
BiPredicate<Map<String, BatchingExecutionContext<RequestT, ResponseT>>, RequestT> flushCondition) {
5862
this.idToBatchContext = new ConcurrentHashMap<>();
59-
this.maxBufferSize = maxBufferSize;
6063
this.nextId = 0;
6164
this.nextBatchEntry = 0;
6265
this.scheduledFlush = scheduledFlush;
66+
this.flushCondition = flushCondition;
6367
}
6468

65-
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(int maxBatchItems) {
69+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(RequestT request) {
6670
synchronized (flushLock) {
67-
if (idToBatchContext.size() >= maxBatchItems) {
71+
if (flushCondition.test(idToBatchContext, request)) {
6872
return extractFlushedEntries(maxBatchItems);
6973
}
7074
return new ConcurrentHashMap<>();
7175
}
7276
}
7377

78+
7479
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(int maxBatchItems) {
7580
synchronized (flushLock) {
7681
if (idToBatchContext.size() > 0) {
@@ -93,8 +98,8 @@ private Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractFlushe
9398

9499
public void put(RequestT request, CompletableFuture<ResponseT> response) {
95100
synchronized (this) {
96-
if (idToBatchContext.size() == maxBufferSize) {
97-
throw new IllegalStateException("Reached MaxBufferSize of: " + maxBufferSize);
101+
if (idToBatchContext.size() == MAX_SEND_MESSAGE_BATCH_SIZE) {
102+
throw new IllegalStateException("Reached MaxBufferSize of: " + MAX_SEND_MESSAGE_BATCH_SIZE);
98103
}
99104

100105
if (nextId == Integer.MAX_VALUE) {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchConfiguration.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@
2424
public final class RequestBatchConfiguration {
2525

2626
public static final int DEFAULT_MAX_BATCH_ITEMS = 10;
27-
public static final int DEFAULT_MAX_BATCH_KEYS = 100;
27+
public static final int DEFAULT_MAX_BATCH_KEYS = 1000;
2828
public static final int DEFAULT_MAX_BUFFER_SIZE = 500;
2929
public static final Duration DEFAULT_MAX_BATCH_OPEN_IN_MS = Duration.ofMillis(200);
3030

3131
private final Integer maxBatchItems;
3232
private final Integer maxBatchKeys;
3333
private final Integer maxBufferSize;
34-
private final Duration maxBatchOpenDuration;
34+
private final Duration batchSendRequestFrequency;
3535

3636
public RequestBatchConfiguration(BatchOverrideConfiguration overrideConfiguration) {
3737
this.maxBatchItems = Optional.ofNullable(overrideConfiguration)
@@ -46,13 +46,13 @@ public RequestBatchConfiguration(BatchOverrideConfiguration overrideConfiguratio
4646
.map(BatchOverrideConfiguration::maxBufferSize)
4747
.orElse(DEFAULT_MAX_BUFFER_SIZE);
4848

49-
this.maxBatchOpenDuration = Optional.ofNullable(overrideConfiguration)
50-
.map(BatchOverrideConfiguration::maxBatchOpenDuration)
49+
this.batchSendRequestFrequency = Optional.ofNullable(overrideConfiguration)
50+
.map(BatchOverrideConfiguration::batchSendRequestFrequency)
5151
.orElse(DEFAULT_MAX_BATCH_OPEN_IN_MS);
5252
}
5353

54-
public Duration maxBatchOpenDuration() {
55-
return maxBatchOpenDuration;
54+
public Duration batchSendRequestFrequency() {
55+
return batchSendRequestFrequency;
5656
}
5757

5858
public int maxBatchItems() {

0 commit comments

Comments
 (0)