Skip to content

Refactoring the core BatchManager to change how it handles batch entry failures. #2664

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchmanager;
package software.amazon.awssdk.core.batchmanager;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.core.batchmanager.IdentifiableMessage;

/**
* Takes a list of identified requests in addition to a destination and batches the requests into a batch request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchmanager;
package software.amazon.awssdk.core.batchmanager;

import software.amazon.awssdk.annotations.SdkProtectedApi;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

import java.util.concurrent.ScheduledExecutorService;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.core.internal.batchmanager.BatchAndSend;
import software.amazon.awssdk.core.internal.batchmanager.BatchKeyMapper;
import software.amazon.awssdk.core.internal.batchmanager.BatchResponseMapper;

@SdkProtectedApi
public interface BatchManagerBuilder<RequestT, ResponseT, BatchResponseT, B> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,15 @@ public final class BatchOverrideConfiguration implements ToCopyableBuilder<Batch
BatchOverrideConfiguration> {

private final Integer maxBatchItems;
private final Integer maxBatchKeys;
private final Integer maxBufferSize;
private final Duration maxBatchOpenInMs;

public BatchOverrideConfiguration(Builder builder) {
this.maxBatchItems = Validate.isPositiveOrNull(builder.maxBatchItems, "maxBatchItems");
this.maxBatchOpenInMs = Validate.isPositiveOrNull(builder.maxBatchOpenInMs, "maxBachOpenInMs");
this.maxBatchKeys = Validate.isPositiveOrNull(builder.maxBatchKeys, "maxBatchKeys");
this.maxBufferSize = Validate.isPositiveOrNull(builder.maxBufferSize, "maxBufferSize");
}

public static Builder builder() {
Expand All @@ -51,6 +55,20 @@ public Optional<Integer> maxBatchItems() {
return Optional.ofNullable(maxBatchItems);
}

/**
* @return the optional maximum number of batchKeys to keep track of.
*/
public Optional<Integer> maxBatchKeys() {
return Optional.ofNullable(maxBatchKeys);
}

/**
* @return the maximum number of items to allow to be buffered for each batchKey.
*/
public Optional<Integer> maxBufferSize() {
return Optional.ofNullable(maxBufferSize);
}

/**
* @return the optional maximum amount of time (in milliseconds) that an outgoing call waits to be batched with messages of
* the same type.
Expand All @@ -62,13 +80,17 @@ public Optional<Duration> maxBatchOpenInMs() {
@Override
public Builder toBuilder() {
return new Builder().maxBatchItems(maxBatchItems)
.maxBatchKeys(maxBatchKeys)
.maxBufferSize(maxBufferSize)
.maxBatchOpenInMs(maxBatchOpenInMs);
}

@Override
public String toString() {
return ToString.builder("BatchOverrideConfiguration")
.add("maxBatchItems", maxBatchItems)
.add("maxBatchKeys", maxBatchKeys)
.add("maxBufferSize", maxBufferSize)
.add("maxBatchOpenInMs", maxBatchOpenInMs.toMillis())
.build();
}
Expand All @@ -87,19 +109,29 @@ public boolean equals(Object o) {
if (maxBatchItems != null ? !maxBatchItems.equals(that.maxBatchItems) : that.maxBatchItems != null) {
return false;
}
if (maxBatchKeys != null ? !maxBatchKeys.equals(that.maxBatchKeys) : that.maxBatchKeys != null) {
return false;
}
if (maxBufferSize != null ? !maxBufferSize.equals(that.maxBufferSize) : that.maxBufferSize != null) {
return false;
}
return maxBatchOpenInMs != null ? maxBatchOpenInMs.equals(that.maxBatchOpenInMs) : that.maxBatchOpenInMs == null;
}

@Override
public int hashCode() {
int result = maxBatchItems != null ? maxBatchItems.hashCode() : 0;
result = 31 * result + (maxBatchKeys != null ? maxBatchKeys.hashCode() : 0);
result = 31 * result + (maxBufferSize != null ? maxBufferSize.hashCode() : 0);
result = 31 * result + (maxBatchOpenInMs != null ? maxBatchOpenInMs.hashCode() : 0);
return result;
}

public static final class Builder implements CopyableBuilder<Builder, BatchOverrideConfiguration> {

private Integer maxBatchItems;
private Integer maxBatchKeys;
private Integer maxBufferSize;
private Duration maxBatchOpenInMs;

private Builder() {
Expand All @@ -116,6 +148,32 @@ public Builder maxBatchItems(Integer maxBatchItems) {
return this;
}

/**
* Define the maximum number of batchKeys to keep track of. A batchKey determines which requests are batched together
* and is calculated by the client based on the information in a request.
* <p>
* Ex. SQS determines a batchKey based on a request's queueUrl in combination with its overrideConfiguration, so
* requests with the same queueUrl and overrideConfiguration will have the same batchKey and be batched together.
*
* @param maxBatchKeys the new maxBatchKeys value.
* @return This object for method chaining.
*/
public Builder maxBatchKeys(Integer maxBatchKeys) {
this.maxBatchKeys = maxBatchKeys;
return this;
}

/**
* Define the maximum number of items to allow to be buffered for each batchKey.
*
* @param maxBufferSize the new maxBufferSize value.
* @return This object for method chaining.
*/
public Builder maxBufferSize(Integer maxBufferSize) {
this.maxBufferSize = maxBufferSize;
return this;
}

/**
* The maximum amount of time (in milliseconds) that an outgoing call waits for other requests before sending out a batch
* request.
Expand All @@ -128,6 +186,7 @@ public Builder maxBatchOpenInMs(Duration maxBatchOpenInMs) {
return this;
}


public BatchOverrideConfiguration build() {
return new BatchOverrideConfiguration(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchmanager;
package software.amazon.awssdk.core.batchmanager;

import java.util.List;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.core.batchmanager.IdentifiableMessage;
import software.amazon.awssdk.utils.Either;

/**
* Unpacks the batch response, then transforms individual entries to the appropriate response type. Each entry's batch ID
Expand All @@ -27,5 +29,5 @@
@FunctionalInterface
@SdkProtectedApi
public interface BatchResponseMapper<BatchResponseT, ResponseT> {
List<IdentifiableMessage<ResponseT>> mapBatchResponse(BatchResponseT batchResponse);
List<Either<IdentifiableMessage<ResponseT>, IdentifiableMessage<Throwable>>> mapBatchResponse(BatchResponseT batchResponse);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchmanager;
package software.amazon.awssdk.core.batchmanager;

import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.utils.Validate;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
@SdkInternalApi
public final class BatchBuffer<RequestT, ResponseT> {
private final Object flushLock = new Object();

private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;

/**
* Maximum number of elements that can be included in the BatchBuffer.
*/
private final int maxBufferSize;

// TODO: Figure out better name for nextId and nextBatchEntry.
/**
* Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next
Expand All @@ -50,8 +54,9 @@ public final class BatchBuffer<RequestT, ResponseT> {
*/
private ScheduledFuture<?> scheduledFlush;

public BatchBuffer(ScheduledFuture<?> scheduledFlush) {
public BatchBuffer(int maxBufferSize, ScheduledFuture<?> scheduledFlush) {
this.idToBatchContext = new ConcurrentHashMap<>();
this.maxBufferSize = maxBufferSize;
this.nextId = 0;
this.nextBatchEntry = 0;
this.scheduledFlush = scheduledFlush;
Expand Down Expand Up @@ -94,13 +99,17 @@ public CompletableFuture<ResponseT> getResponse(String key) {
return idToBatchContext.get(key).response();
}

public BatchingExecutionContext<RequestT, ResponseT> put(RequestT request, CompletableFuture<ResponseT> response) {
public void put(RequestT request, CompletableFuture<ResponseT> response) {
synchronized (this) {
if (idToBatchContext.size() == maxBufferSize) {
throw new IllegalStateException("Reached MaxBufferSize of: " + maxBufferSize);
}

if (nextId == Integer.MAX_VALUE) {
nextId = 0;
}
String id = Integer.toString(nextId++);
return idToBatchContext.put(id, new BatchingExecutionContext<>(request, response));
idToBatchContext.put(id, new BatchingExecutionContext<>(request, response));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,20 @@ public final class BatchConfiguration {

// TODO: Update these default values.
private static final int DEFAULT_MAX_BATCH_ITEMS = 5;
private static final int DEFAULT_MAX_BATCH_KEYS = 100;
private static final int DEFAULT_MAX_BUFFER_SIZE = 500;
private static final Duration DEFAULT_MAX_BATCH_OPEN_IN_MS = Duration.ofMillis(200);

private final Integer maxBatchItems;
private final Integer maxBatchKeys;
private final Integer maxBufferSize;
private final Duration maxBatchOpenInMs;

public BatchConfiguration(BatchOverrideConfiguration overrideConfiguration) {
Optional<BatchOverrideConfiguration> configuration = Optional.ofNullable(overrideConfiguration);
this.maxBatchItems = configuration.flatMap(BatchOverrideConfiguration::maxBatchItems).orElse(DEFAULT_MAX_BATCH_ITEMS);
this.maxBatchKeys = configuration.flatMap(BatchOverrideConfiguration::maxBatchKeys).orElse(DEFAULT_MAX_BATCH_KEYS);
this.maxBufferSize = configuration.flatMap(BatchOverrideConfiguration::maxBufferSize).orElse(DEFAULT_MAX_BUFFER_SIZE);
this.maxBatchOpenInMs = configuration.flatMap(BatchOverrideConfiguration::maxBatchOpenInMs)
.orElse(DEFAULT_MAX_BATCH_OPEN_IN_MS);
}
Expand All @@ -43,4 +50,12 @@ public Duration maxBatchOpenInMs() {
public int maxBatchItems() {
return maxBatchItems;
}

public int maxBatchKeys() {
return maxBatchKeys;
}

public int maxBufferSize() {
return maxBufferSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,20 +30,32 @@
@SdkInternalApi
public final class BatchingMap<RequestT, ResponseT> {

private final int maxBatchKeys;
private final int maxBufferSize;
private final Map<String, BatchBuffer<RequestT, ResponseT>> batchContextMap;

public BatchingMap() {
public BatchingMap(int maxBatchKeys, int maxBufferSize) {
this.batchContextMap = new ConcurrentHashMap<>();
this.maxBatchKeys = maxBatchKeys;
this.maxBufferSize = maxBufferSize;
}

// put has a happens-before relation with removeBufferIfNeeded which is called by flushableRequests() and
// flushableScheduledRequests. This is done because removeBufferIfNeeded removes the underlying buffer and so we want to
// guarantee that we don't try to put items into a buffer that is removed.
public void put(String batchKey, Supplier<ScheduledFuture<?>> scheduleFlush, RequestT request,
CompletableFuture<ResponseT> response) {
batchContextMap.computeIfAbsent(batchKey, k -> new BatchBuffer<>(scheduleFlush.get()))
CompletableFuture<ResponseT> response) throws IllegalStateException {
batchContextMap.computeIfAbsent(batchKey, k -> {
if (batchContextMap.size() == maxBatchKeys) {
throw new IllegalStateException("Reached MaxBatchKeys of: " + maxBatchKeys);
}
return new BatchBuffer<>(maxBufferSize, scheduleFlush.get());
})
.put(request, response);
}

public void putScheduledFlush(String key, ScheduledFuture<?> scheduledFlush) {
batchContextMap.get(key).putScheduledFlush(scheduledFlush);
public void putScheduledFlush(String batchKey, ScheduledFuture<?> scheduledFlush) {
batchContextMap.get(batchKey).putScheduledFlush(scheduledFlush);
}

public void forEach(BiConsumer<String, BatchBuffer<RequestT, ResponseT>> action) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.batchmanager.BatchAndSend;
import software.amazon.awssdk.core.batchmanager.BatchKeyMapper;
import software.amazon.awssdk.core.batchmanager.BatchManager;
import software.amazon.awssdk.core.batchmanager.BatchOverrideConfiguration;
import software.amazon.awssdk.core.batchmanager.BatchResponseMapper;
import software.amazon.awssdk.core.batchmanager.IdentifiableMessage;
import software.amazon.awssdk.utils.Validate;

@SdkInternalApi
public final class DefaultBatchManager<RequestT, ResponseT, BatchResponseT> implements BatchManager<RequestT, ResponseT,
BatchResponseT> {

private final int maxBatchItems;
private final Duration maxBatchOpenInMs;

Expand Down Expand Up @@ -65,7 +68,8 @@ public final class DefaultBatchManager<RequestT, ResponseT, BatchResponseT> impl

private DefaultBatchManager(DefaultBuilder<RequestT, ResponseT, BatchResponseT> builder) {
BatchConfiguration batchConfiguration = new BatchConfiguration(builder.overrideConfiguration);
this.requestsAndResponsesMaps = new BatchingMap<>();
this.requestsAndResponsesMaps = new BatchingMap<>(batchConfiguration.maxBatchKeys(),
batchConfiguration.maxBufferSize());
this.maxBatchItems = batchConfiguration.maxBatchItems();
this.maxBatchOpenInMs = batchConfiguration.maxBatchOpenInMs();
this.batchFunction = Validate.notNull(builder.batchFunction, "Null batchFunction");
Expand Down Expand Up @@ -116,7 +120,6 @@ private void flushBuffer(String batchKey, Map<String, BatchingExecutionContext<R
List<IdentifiableMessage<RequestT>> requestEntries = new ArrayList<>();
flushableRequests.forEach((contextId, batchExecutionContext) ->
requestEntries.add(new IdentifiableMessage<>(contextId, batchExecutionContext.request())));

if (!requestEntries.isEmpty()) {
batchFunction.batchAndSend(requestEntries, batchKey)
.whenComplete((result, ex) -> handleAndCompleteResponses(result, ex, flushableRequests));
Expand All @@ -129,14 +132,14 @@ private void handleAndCompleteResponses(BatchResponseT batchResult, Throwable ex
requests.forEach((contextId, batchExecutionContext) -> batchExecutionContext.response()
.completeExceptionally(exception));
} else {
List<IdentifiableMessage<ResponseT>> identifiedResponses = responseMapper.mapBatchResponse(batchResult);
for (IdentifiableMessage<ResponseT> identifiedResponse : identifiedResponses) {
String id = identifiedResponse.id();
ResponseT response = identifiedResponse.message();
requests.get(id)
.response()
.complete(response);
}
responseMapper.mapBatchResponse(batchResult)
.forEach(
response -> response.map(actualResponse -> requests.get(actualResponse.id())
.response()
.complete(actualResponse.message()),
throwable -> requests.get(throwable.id())
.response()
.completeExceptionally(throwable.message())));
}
requests.clear();
}
Expand Down
Loading