Skip to content

Adding SqsBatchManager interfaces and Refactoring core BatchManager with interfaces #2645

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.batchmanager;

import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.core.internal.batchmanager.DefaultBatchManager;
import software.amazon.awssdk.utils.SdkAutoCloseable;

/**
* Generic batch manager that implements automatic request batching features.
* @param <RequestT> the type of an outgoing request.
* @param <ResponseT> the type of an outgoing response.
* @param <BatchResponseT> the type of an outgoing batch response.
*/
@SdkProtectedApi
public interface BatchManager<RequestT, ResponseT, BatchResponseT> extends SdkAutoCloseable {

/**
* Buffers outgoing requests on the client and sends them as batch requests to the service. Requests are batched together
* according to a batchKey and are sent periodically to the service as determined by a configured timeout. If the
* number of requests for a batchKey reaches or exceeds a configured limit, then the requests are immediately flushed
* and the timeout on the periodic flush is reset.
* <p>
* By default, messages are batched according to a service's maximum size for a batch request. These settings can be
* customized via the configuration.
*
* @param request the outgoing request.
* @return a CompletableFuture of the corresponding response.
*/
CompletableFuture<ResponseT> sendRequest(RequestT request);

/**
* Creates a newly initialized BatchManager builder object.
*
* @param <RequestT> the type of an outgoing request.
* @param <ResponseT> the type of an outgoing response.
* @param <BatchResponseT> the type of an outgoing batch response.
*/
static <RequestT, ResponseT, BatchResponseT> DefaultBatchManager.Builder<RequestT, ResponseT, BatchResponseT> builder(
Class<? extends RequestT> requestClass, Class<? extends ResponseT> responseClass,
Class<? extends BatchResponseT> batchResponseClass) {
return DefaultBatchManager.builder();
}

/**
* The BatchManager Builder.
*
* @param <RequestT> the type of an outgoing request.
* @param <ResponseT> the type of an outgoing response.
* @param <BatchResponseT> the type of an outgoing batch response.
*/
interface Builder<RequestT, ResponseT, BatchResponseT> extends BatchManagerBuilder<RequestT, ResponseT, BatchResponseT,
BatchManager.Builder<RequestT, ResponseT, BatchResponseT>> {

/**
* An immutable object that is created from the properties that have been set on the builder.
* @return a reference to this object so that method calls can be chained together.
*/
BatchManager<RequestT, ResponseT, BatchResponseT> build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.batchmanager;

import java.util.concurrent.ScheduledExecutorService;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.core.internal.batchmanager.BatchAndSend;
import software.amazon.awssdk.core.internal.batchmanager.BatchKeyMapper;
import software.amazon.awssdk.core.internal.batchmanager.BatchResponseMapper;

@SdkProtectedApi
public interface BatchManagerBuilder<RequestT, ResponseT, BatchResponseT, B> {

/**
* Defines overrides to the default BatchManager configuration that should be used.
*
* @param overrideConfiguration the override configuration.
* @return a reference to this object so that method calls can be chained together.
*/
B overrideConfiguration(BatchOverrideConfiguration overrideConfiguration);

/**
* Adds a {@link ScheduledExecutorService} to be used by the BatchManager to schedule periodic flushes of the underlying
* buffers.
*
* @param scheduledExecutor the provided scheduled executor.
* @return a reference to this object so that method calls can be chained together.
*/
B scheduledExecutor(ScheduledExecutorService scheduledExecutor);

/**
* Adds a function that defines how requests should be batched together into the appropriate batch response.
*
* @param batchFunction the provided function.
* @return a reference to this object so that method calls can be chained together.
*/
B batchFunction(BatchAndSend<RequestT, BatchResponseT> batchFunction);

/**
* Adds a function that defines how a batch response should be extracted and transformed into its individual responses.
*
* @param responseMapper the provided function.
* @return a reference to this object so that method calls can be chained together.
*/
B responseMapper(BatchResponseMapper<BatchResponseT, ResponseT> responseMapper);

/**
* Adds a function that calculates an appropriate batchKey from a given request.
*
* @param batchKeyMapper the provided function.
* @return a reference to this object so that method calls can be chained together.
*/
B batchKeyMapper(BatchKeyMapper<RequestT> batchKeyMapper);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core;
package software.amazon.awssdk.core.batchmanager;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import java.util.Optional;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.internal.batchmanager.BatchManager;
import software.amazon.awssdk.core.internal.batchmanager.DefaultBatchManager;
import software.amazon.awssdk.utils.ToString;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.CopyableBuilder;
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;

/**
* Configuration values for the {@link BatchManager}. All values are optional, and the default values will be used
* Configuration values for the {@link DefaultBatchManager}. All values are optional, and the default values will be used
* if they are not specified.
*/
@SdkPublicApi
Expand All @@ -34,14 +34,10 @@ public final class BatchOverrideConfiguration implements ToCopyableBuilder<Batch

private final Integer maxBatchItems;
private final Duration maxBatchOpenInMs;
private final ScheduledExecutorService scheduledExecutor;

public BatchOverrideConfiguration(Builder builder) {
Validate.notNull(builder.maxBatchItems, "maxBatchItems cannot be null");
this.maxBatchItems = Validate.isPositive(builder.maxBatchItems, "maxBatchItems");
Validate.notNull(builder.maxBatchOpenInMs, "maxBatchOpenInMs cannot be null");
this.maxBatchOpenInMs = Validate.isPositive(builder.maxBatchOpenInMs, "maxBachOpenInMs");
this.scheduledExecutor = Validate.notNull(builder.scheduledExecutor, "scheduledExecutor cannot be null");
this.maxBatchItems = Validate.isPositiveOrNull(builder.maxBatchItems, "maxBatchItems");
this.maxBatchOpenInMs = Validate.isPositiveOrNull(builder.maxBatchOpenInMs, "maxBachOpenInMs");
}

public static Builder builder() {
Expand All @@ -51,27 +47,22 @@ public static Builder builder() {
/**
* @return the optional maximum number of messages that are batched together in a single request.
*/
public Integer maxBatchItems() {
return maxBatchItems;
public Optional<Integer> maxBatchItems() {
return Optional.ofNullable(maxBatchItems);
}

/**
* @return the optional maximum amount of time (in milliseconds) that an outgoing call waits to be batched with messages of
* the same type.
*/
public Duration maxBatchOpenInMs() {
return maxBatchOpenInMs;
}

public ScheduledExecutorService scheduledExecutor() {
return scheduledExecutor;
public Optional<Duration> maxBatchOpenInMs() {
return Optional.ofNullable(maxBatchOpenInMs);
}

@Override
public Builder toBuilder() {
return new Builder().maxBatchItems(maxBatchItems)
.maxBatchOpenInMs(maxBatchOpenInMs)
.scheduledExecutor(scheduledExecutor);
.maxBatchOpenInMs(maxBatchOpenInMs);
}

@Override
Expand All @@ -96,44 +87,47 @@ public boolean equals(Object o) {
if (maxBatchItems != null ? !maxBatchItems.equals(that.maxBatchItems) : that.maxBatchItems != null) {
return false;
}
if (maxBatchOpenInMs != null ? !maxBatchOpenInMs.equals(that.maxBatchOpenInMs) : that.maxBatchOpenInMs != null) {
return false;
}
return scheduledExecutor.equals(that.scheduledExecutor);
return maxBatchOpenInMs != null ? maxBatchOpenInMs.equals(that.maxBatchOpenInMs) : that.maxBatchOpenInMs == null;
}

@Override
public int hashCode() {
int result = maxBatchItems != null ? maxBatchItems.hashCode() : 0;
result = 31 * result + (maxBatchOpenInMs != null ? maxBatchOpenInMs.hashCode() : 0);
result = 31 * result + scheduledExecutor.hashCode();
return result;
}

public static final class Builder implements CopyableBuilder<Builder, BatchOverrideConfiguration> {

private Integer maxBatchItems;
private Duration maxBatchOpenInMs;
private ScheduledExecutorService scheduledExecutor;

private Builder() {
}

/**
* Define the the maximum number of messages that are batched together in a single request.
*
* @param maxBatchItems The new maxBatchItems value.
* @return This object for method chaining.
*/
public Builder maxBatchItems(Integer maxBatchItems) {
this.maxBatchItems = maxBatchItems;
return this;
}

/**
* The maximum amount of time (in milliseconds) that an outgoing call waits for other requests before sending out a batch
* request.
*
* @param maxBatchOpenInMs The new maxBatchOpenInMs value.
* @return This object for method chaining.
*/
public Builder maxBatchOpenInMs(Duration maxBatchOpenInMs) {
this.maxBatchOpenInMs = maxBatchOpenInMs;
return this;
}

public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
this.scheduledExecutor = scheduledExecutor;
return this;
}

public BatchOverrideConfiguration build() {
return new BatchOverrideConfiguration(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchmanager;

import java.time.Duration;
import java.util.Optional;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.batchmanager.BatchOverrideConfiguration;

@SdkInternalApi
public final class BatchConfiguration {

// TODO: Update these default values.
private static final int DEFAULT_MAX_BATCH_ITEMS = 5;
private static final Duration DEFAULT_MAX_BATCH_OPEN_IN_MS = Duration.ofMillis(200);
private final Integer maxBatchItems;
private final Duration maxBatchOpenInMs;

public BatchConfiguration(BatchOverrideConfiguration overrideConfiguration) {
Optional<BatchOverrideConfiguration> configuration = Optional.ofNullable(overrideConfiguration);
this.maxBatchItems = configuration.flatMap(BatchOverrideConfiguration::maxBatchItems).orElse(DEFAULT_MAX_BATCH_ITEMS);
this.maxBatchOpenInMs = configuration.flatMap(BatchOverrideConfiguration::maxBatchOpenInMs)
.orElse(DEFAULT_MAX_BATCH_OPEN_IN_MS);
}

public Duration maxBatchOpenInMs() {
return maxBatchOpenInMs;
}

public int maxBatchItems() {
return maxBatchItems;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import software.amazon.awssdk.annotations.SdkProtectedApi;

/**
* Takes a request and extracts a batchGroupId as determined by the caller.
* Takes a request and extracts a batchKey as determined by the caller.
* TODO: For right now, the batchKey is a String but this may change as needed in the future.
* @param <RequestT> the request.
*/
Expand Down
Loading