Skip to content

Commit 2adad8d

Browse files
16lim21Michael Li
andauthored
Refactoring the core BatchManager to change how it handles batch entry failures. (#2664)
* Moving protected APIs out of internal package * Refactoring test import * Adding configurable option for users to specify batchKey and batchBuffer limit * Modifying batchBuffer and batchingMap to handle limits * Returning exception on batch entry failure * Modifying test to expect exception on batch entry failure * Refactoring core DefaultBatchManager Allows for either an exception or response to be returned by the responseMapper. Also adding tests and option to specify batchKey and batchBuffer limits. * Addressing PR comments * Adding tests to increase test coverage * Addressing PR comments and fixing build errors Co-authored-by: Michael Li <[email protected]>
1 parent 14a7cb6 commit 2adad8d

File tree

16 files changed

+311
-67
lines changed

16 files changed

+311
-67
lines changed
Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,12 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
package software.amazon.awssdk.core.internal.batchmanager;
16+
package software.amazon.awssdk.core.batchmanager;
1717

1818
import java.util.List;
1919
import java.util.concurrent.CompletableFuture;
2020
import software.amazon.awssdk.annotations.SdkProtectedApi;
21+
import software.amazon.awssdk.core.batchmanager.IdentifiableMessage;
2122

2223
/**
2324
* Takes a list of identified requests in addition to a destination and batches the requests into a batch request.
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
package software.amazon.awssdk.core.internal.batchmanager;
16+
package software.amazon.awssdk.core.batchmanager;
1717

1818
import software.amazon.awssdk.annotations.SdkProtectedApi;
1919

core/sdk-core/src/main/java/software/amazon/awssdk/core/batchmanager/BatchManagerBuilder.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@
1717

1818
import java.util.concurrent.ScheduledExecutorService;
1919
import software.amazon.awssdk.annotations.SdkProtectedApi;
20-
import software.amazon.awssdk.core.internal.batchmanager.BatchAndSend;
21-
import software.amazon.awssdk.core.internal.batchmanager.BatchKeyMapper;
22-
import software.amazon.awssdk.core.internal.batchmanager.BatchResponseMapper;
2320

2421
@SdkProtectedApi
2522
public interface BatchManagerBuilder<RequestT, ResponseT, BatchResponseT, B> {

core/sdk-core/src/main/java/software/amazon/awssdk/core/batchmanager/BatchOverrideConfiguration.java

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,15 @@ public final class BatchOverrideConfiguration implements ToCopyableBuilder<Batch
3333
BatchOverrideConfiguration> {
3434

3535
private final Integer maxBatchItems;
36+
private final Integer maxBatchKeys;
37+
private final Integer maxBufferSize;
3638
private final Duration maxBatchOpenInMs;
3739

3840
public BatchOverrideConfiguration(Builder builder) {
3941
this.maxBatchItems = Validate.isPositiveOrNull(builder.maxBatchItems, "maxBatchItems");
4042
this.maxBatchOpenInMs = Validate.isPositiveOrNull(builder.maxBatchOpenInMs, "maxBachOpenInMs");
43+
this.maxBatchKeys = Validate.isPositiveOrNull(builder.maxBatchKeys, "maxBatchKeys");
44+
this.maxBufferSize = Validate.isPositiveOrNull(builder.maxBufferSize, "maxBufferSize");
4145
}
4246

4347
public static Builder builder() {
@@ -51,6 +55,20 @@ public Optional<Integer> maxBatchItems() {
5155
return Optional.ofNullable(maxBatchItems);
5256
}
5357

58+
/**
59+
* @return the optional maximum number of batchKeys to keep track of.
60+
*/
61+
public Optional<Integer> maxBatchKeys() {
62+
return Optional.ofNullable(maxBatchKeys);
63+
}
64+
65+
/**
66+
* @return the maximum number of items to allow to be buffered for each batchKey.
67+
*/
68+
public Optional<Integer> maxBufferSize() {
69+
return Optional.ofNullable(maxBufferSize);
70+
}
71+
5472
/**
5573
* @return the optional maximum amount of time (in milliseconds) that an outgoing call waits to be batched with messages of
5674
* the same type.
@@ -62,13 +80,17 @@ public Optional<Duration> maxBatchOpenInMs() {
6280
@Override
6381
public Builder toBuilder() {
6482
return new Builder().maxBatchItems(maxBatchItems)
83+
.maxBatchKeys(maxBatchKeys)
84+
.maxBufferSize(maxBufferSize)
6585
.maxBatchOpenInMs(maxBatchOpenInMs);
6686
}
6787

6888
@Override
6989
public String toString() {
7090
return ToString.builder("BatchOverrideConfiguration")
7191
.add("maxBatchItems", maxBatchItems)
92+
.add("maxBatchKeys", maxBatchKeys)
93+
.add("maxBufferSize", maxBufferSize)
7294
.add("maxBatchOpenInMs", maxBatchOpenInMs.toMillis())
7395
.build();
7496
}
@@ -87,19 +109,29 @@ public boolean equals(Object o) {
87109
if (maxBatchItems != null ? !maxBatchItems.equals(that.maxBatchItems) : that.maxBatchItems != null) {
88110
return false;
89111
}
112+
if (maxBatchKeys != null ? !maxBatchKeys.equals(that.maxBatchKeys) : that.maxBatchKeys != null) {
113+
return false;
114+
}
115+
if (maxBufferSize != null ? !maxBufferSize.equals(that.maxBufferSize) : that.maxBufferSize != null) {
116+
return false;
117+
}
90118
return maxBatchOpenInMs != null ? maxBatchOpenInMs.equals(that.maxBatchOpenInMs) : that.maxBatchOpenInMs == null;
91119
}
92120

93121
@Override
94122
public int hashCode() {
95123
int result = maxBatchItems != null ? maxBatchItems.hashCode() : 0;
124+
result = 31 * result + (maxBatchKeys != null ? maxBatchKeys.hashCode() : 0);
125+
result = 31 * result + (maxBufferSize != null ? maxBufferSize.hashCode() : 0);
96126
result = 31 * result + (maxBatchOpenInMs != null ? maxBatchOpenInMs.hashCode() : 0);
97127
return result;
98128
}
99129

100130
public static final class Builder implements CopyableBuilder<Builder, BatchOverrideConfiguration> {
101131

102132
private Integer maxBatchItems;
133+
private Integer maxBatchKeys;
134+
private Integer maxBufferSize;
103135
private Duration maxBatchOpenInMs;
104136

105137
private Builder() {
@@ -116,6 +148,32 @@ public Builder maxBatchItems(Integer maxBatchItems) {
116148
return this;
117149
}
118150

151+
/**
152+
* Define the maximum number of batchKeys to keep track of. A batchKey determines which requests are batched together
153+
* and is calculated by the client based on the information in a request.
154+
* <p>
155+
* Ex. SQS determines a batchKey based on a request's queueUrl in combination with its overrideConfiguration, so
156+
* requests with the same queueUrl and overrideConfiguration will have the same batchKey and be batched together.
157+
*
158+
* @param maxBatchKeys the new maxBatchKeys value.
159+
* @return This object for method chaining.
160+
*/
161+
public Builder maxBatchKeys(Integer maxBatchKeys) {
162+
this.maxBatchKeys = maxBatchKeys;
163+
return this;
164+
}
165+
166+
/**
167+
* Define the maximum number of items to allow to be buffered for each batchKey.
168+
*
169+
* @param maxBufferSize the new maxBufferSize value.
170+
* @return This object for method chaining.
171+
*/
172+
public Builder maxBufferSize(Integer maxBufferSize) {
173+
this.maxBufferSize = maxBufferSize;
174+
return this;
175+
}
176+
119177
/**
120178
* The maximum amount of time (in milliseconds) that an outgoing call waits for other requests before sending out a batch
121179
* request.
@@ -128,6 +186,7 @@ public Builder maxBatchOpenInMs(Duration maxBatchOpenInMs) {
128186
return this;
129187
}
130188

189+
131190
public BatchOverrideConfiguration build() {
132191
return new BatchOverrideConfiguration(this);
133192
}
Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
package software.amazon.awssdk.core.internal.batchmanager;
16+
package software.amazon.awssdk.core.batchmanager;
1717

1818
import java.util.List;
1919
import software.amazon.awssdk.annotations.SdkProtectedApi;
20+
import software.amazon.awssdk.core.batchmanager.IdentifiableMessage;
21+
import software.amazon.awssdk.utils.Either;
2022

2123
/**
2224
* Unpacks the batch response, then transforms individual entries to the appropriate response type. Each entry's batch ID
@@ -27,5 +29,5 @@
2729
@FunctionalInterface
2830
@SdkProtectedApi
2931
public interface BatchResponseMapper<BatchResponseT, ResponseT> {
30-
List<IdentifiableMessage<ResponseT>> mapBatchResponse(BatchResponseT batchResponse);
32+
List<Either<IdentifiableMessage<ResponseT>, IdentifiableMessage<Throwable>>> mapBatchResponse(BatchResponseT batchResponse);
3133
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
package software.amazon.awssdk.core.internal.batchmanager;
16+
package software.amazon.awssdk.core.batchmanager;
1717

1818
import software.amazon.awssdk.annotations.SdkProtectedApi;
1919
import software.amazon.awssdk.utils.Validate;

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/batchmanager/BatchBuffer.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,13 @@
2727
@SdkInternalApi
2828
public final class BatchBuffer<RequestT, ResponseT> {
2929
private final Object flushLock = new Object();
30-
3130
private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;
3231

32+
/**
33+
* Maximum number of elements that can be included in the BatchBuffer.
34+
*/
35+
private final int maxBufferSize;
36+
3337
// TODO: Figure out better name for nextId and nextBatchEntry.
3438
/**
3539
* Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next
@@ -50,8 +54,9 @@ public final class BatchBuffer<RequestT, ResponseT> {
5054
*/
5155
private ScheduledFuture<?> scheduledFlush;
5256

53-
public BatchBuffer(ScheduledFuture<?> scheduledFlush) {
57+
public BatchBuffer(int maxBufferSize, ScheduledFuture<?> scheduledFlush) {
5458
this.idToBatchContext = new ConcurrentHashMap<>();
59+
this.maxBufferSize = maxBufferSize;
5560
this.nextId = 0;
5661
this.nextBatchEntry = 0;
5762
this.scheduledFlush = scheduledFlush;
@@ -94,13 +99,17 @@ public CompletableFuture<ResponseT> getResponse(String key) {
9499
return idToBatchContext.get(key).response();
95100
}
96101

97-
public BatchingExecutionContext<RequestT, ResponseT> put(RequestT request, CompletableFuture<ResponseT> response) {
102+
public void put(RequestT request, CompletableFuture<ResponseT> response) {
98103
synchronized (this) {
104+
if (idToBatchContext.size() == maxBufferSize) {
105+
throw new IllegalStateException("Reached MaxBufferSize of: " + maxBufferSize);
106+
}
107+
99108
if (nextId == Integer.MAX_VALUE) {
100109
nextId = 0;
101110
}
102111
String id = Integer.toString(nextId++);
103-
return idToBatchContext.put(id, new BatchingExecutionContext<>(request, response));
112+
idToBatchContext.put(id, new BatchingExecutionContext<>(request, response));
104113
}
105114
}
106115

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/batchmanager/BatchConfiguration.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,20 @@ public final class BatchConfiguration {
2525

2626
// TODO: Update these default values.
2727
private static final int DEFAULT_MAX_BATCH_ITEMS = 5;
28+
private static final int DEFAULT_MAX_BATCH_KEYS = 100;
29+
private static final int DEFAULT_MAX_BUFFER_SIZE = 500;
2830
private static final Duration DEFAULT_MAX_BATCH_OPEN_IN_MS = Duration.ofMillis(200);
31+
2932
private final Integer maxBatchItems;
33+
private final Integer maxBatchKeys;
34+
private final Integer maxBufferSize;
3035
private final Duration maxBatchOpenInMs;
3136

3237
public BatchConfiguration(BatchOverrideConfiguration overrideConfiguration) {
3338
Optional<BatchOverrideConfiguration> configuration = Optional.ofNullable(overrideConfiguration);
3439
this.maxBatchItems = configuration.flatMap(BatchOverrideConfiguration::maxBatchItems).orElse(DEFAULT_MAX_BATCH_ITEMS);
40+
this.maxBatchKeys = configuration.flatMap(BatchOverrideConfiguration::maxBatchKeys).orElse(DEFAULT_MAX_BATCH_KEYS);
41+
this.maxBufferSize = configuration.flatMap(BatchOverrideConfiguration::maxBufferSize).orElse(DEFAULT_MAX_BUFFER_SIZE);
3542
this.maxBatchOpenInMs = configuration.flatMap(BatchOverrideConfiguration::maxBatchOpenInMs)
3643
.orElse(DEFAULT_MAX_BATCH_OPEN_IN_MS);
3744
}
@@ -43,4 +50,12 @@ public Duration maxBatchOpenInMs() {
4350
public int maxBatchItems() {
4451
return maxBatchItems;
4552
}
53+
54+
public int maxBatchKeys() {
55+
return maxBatchKeys;
56+
}
57+
58+
public int maxBufferSize() {
59+
return maxBufferSize;
60+
}
4661
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/batchmanager/BatchingMap.java

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,32 @@
3030
@SdkInternalApi
3131
public final class BatchingMap<RequestT, ResponseT> {
3232

33+
private final int maxBatchKeys;
34+
private final int maxBufferSize;
3335
private final Map<String, BatchBuffer<RequestT, ResponseT>> batchContextMap;
3436

35-
public BatchingMap() {
37+
public BatchingMap(int maxBatchKeys, int maxBufferSize) {
3638
this.batchContextMap = new ConcurrentHashMap<>();
39+
this.maxBatchKeys = maxBatchKeys;
40+
this.maxBufferSize = maxBufferSize;
3741
}
3842

43+
// put has a happens-before relation with removeBufferIfNeeded which is called by flushableRequests() and
44+
// flushableScheduledRequests. This is done because removeBufferIfNeeded removes the underlying buffer and so we want to
45+
// guarantee that we don't try to put items into a buffer that is removed.
3946
public void put(String batchKey, Supplier<ScheduledFuture<?>> scheduleFlush, RequestT request,
40-
CompletableFuture<ResponseT> response) {
41-
batchContextMap.computeIfAbsent(batchKey, k -> new BatchBuffer<>(scheduleFlush.get()))
47+
CompletableFuture<ResponseT> response) throws IllegalStateException {
48+
batchContextMap.computeIfAbsent(batchKey, k -> {
49+
if (batchContextMap.size() == maxBatchKeys) {
50+
throw new IllegalStateException("Reached MaxBatchKeys of: " + maxBatchKeys);
51+
}
52+
return new BatchBuffer<>(maxBufferSize, scheduleFlush.get());
53+
})
4254
.put(request, response);
4355
}
4456

45-
public void putScheduledFlush(String key, ScheduledFuture<?> scheduledFlush) {
46-
batchContextMap.get(key).putScheduledFlush(scheduledFlush);
57+
public void putScheduledFlush(String batchKey, ScheduledFuture<?> scheduledFlush) {
58+
batchContextMap.get(batchKey).putScheduledFlush(scheduledFlush);
4759
}
4860

4961
public void forEach(BiConsumer<String, BatchBuffer<RequestT, ResponseT>> action) {

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/batchmanager/DefaultBatchManager.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,17 @@
2424
import java.util.concurrent.ScheduledFuture;
2525
import java.util.concurrent.TimeUnit;
2626
import software.amazon.awssdk.annotations.SdkInternalApi;
27+
import software.amazon.awssdk.core.batchmanager.BatchAndSend;
28+
import software.amazon.awssdk.core.batchmanager.BatchKeyMapper;
2729
import software.amazon.awssdk.core.batchmanager.BatchManager;
2830
import software.amazon.awssdk.core.batchmanager.BatchOverrideConfiguration;
31+
import software.amazon.awssdk.core.batchmanager.BatchResponseMapper;
32+
import software.amazon.awssdk.core.batchmanager.IdentifiableMessage;
2933
import software.amazon.awssdk.utils.Validate;
3034

3135
@SdkInternalApi
3236
public final class DefaultBatchManager<RequestT, ResponseT, BatchResponseT> implements BatchManager<RequestT, ResponseT,
3337
BatchResponseT> {
34-
3538
private final int maxBatchItems;
3639
private final Duration maxBatchOpenInMs;
3740

@@ -65,7 +68,8 @@ public final class DefaultBatchManager<RequestT, ResponseT, BatchResponseT> impl
6568

6669
private DefaultBatchManager(DefaultBuilder<RequestT, ResponseT, BatchResponseT> builder) {
6770
BatchConfiguration batchConfiguration = new BatchConfiguration(builder.overrideConfiguration);
68-
this.requestsAndResponsesMaps = new BatchingMap<>();
71+
this.requestsAndResponsesMaps = new BatchingMap<>(batchConfiguration.maxBatchKeys(),
72+
batchConfiguration.maxBufferSize());
6973
this.maxBatchItems = batchConfiguration.maxBatchItems();
7074
this.maxBatchOpenInMs = batchConfiguration.maxBatchOpenInMs();
7175
this.batchFunction = Validate.notNull(builder.batchFunction, "Null batchFunction");
@@ -116,7 +120,6 @@ private void flushBuffer(String batchKey, Map<String, BatchingExecutionContext<R
116120
List<IdentifiableMessage<RequestT>> requestEntries = new ArrayList<>();
117121
flushableRequests.forEach((contextId, batchExecutionContext) ->
118122
requestEntries.add(new IdentifiableMessage<>(contextId, batchExecutionContext.request())));
119-
120123
if (!requestEntries.isEmpty()) {
121124
batchFunction.batchAndSend(requestEntries, batchKey)
122125
.whenComplete((result, ex) -> handleAndCompleteResponses(result, ex, flushableRequests));
@@ -129,14 +132,14 @@ private void handleAndCompleteResponses(BatchResponseT batchResult, Throwable ex
129132
requests.forEach((contextId, batchExecutionContext) -> batchExecutionContext.response()
130133
.completeExceptionally(exception));
131134
} else {
132-
List<IdentifiableMessage<ResponseT>> identifiedResponses = responseMapper.mapBatchResponse(batchResult);
133-
for (IdentifiableMessage<ResponseT> identifiedResponse : identifiedResponses) {
134-
String id = identifiedResponse.id();
135-
ResponseT response = identifiedResponse.message();
136-
requests.get(id)
137-
.response()
138-
.complete(response);
139-
}
135+
responseMapper.mapBatchResponse(batchResult)
136+
.forEach(
137+
response -> response.map(actualResponse -> requests.get(actualResponse.id())
138+
.response()
139+
.complete(actualResponse.message()),
140+
throwable -> requests.get(throwable.id())
141+
.response()
142+
.completeExceptionally(throwable.message())));
140143
}
141144
requests.clear();
142145
}

0 commit comments

Comments
 (0)