Skip to content

Commit 2dc2d4c

Browse files
16lim21Michael Li
andauthored
Adding SqsBatchManager interfaces and Refactoring core BatchManager with interfaces (#2645)
* Creating BatchManager interface and refactoring BatchManager implementation into DefaultBatchManager * Updating builder in BatchManager interface and how it interacts with DefaultBatchManager * Adding a default BatchConfiguration and refactoring BatchOverrideConfiguration into a batchmanager package * Updating tests to be up to date to recent changes * Having DefaultBatchManager.Builder implement BatchManager.Builder and adding override tags * Adding SqsBatchManager interface * Adding client to SqsBatchManager.builder * Adding SqsBatchConfiguration and DefaultSqsBatchManager. Most of SqsBatchManager code was copied from the Sqs batch manager integration test * Fixing checkstyle issues * Removing implementation classes. Will re-add them in the next PR. * Adding SqsAsyncBatchManager * Adding javadocs to SqsBatchManager and SqsAsyncBatchManager * Adding more javadocs to public APIs and moving some from internal APIs to public ones. * Adding other batchable APIs for SQS * Renaming BatchManagerBuilder functions and fixing issues related to it * Interface default behavior throws UnsupportedOperation. Also fixing build failure * Fixing checkstyle issue Co-authored-by: Michael Li <[email protected]>
1 parent 00b2efc commit 2dc2d4c

File tree

11 files changed

+543
-111
lines changed

11 files changed

+543
-111
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.batchmanager;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import software.amazon.awssdk.annotations.SdkProtectedApi;
20+
import software.amazon.awssdk.core.internal.batchmanager.DefaultBatchManager;
21+
import software.amazon.awssdk.utils.SdkAutoCloseable;
22+
23+
/**
24+
* Generic batch manager that implements automatic request batching features.
25+
* @param <RequestT> the type of an outgoing request.
26+
* @param <ResponseT> the type of an outgoing response.
27+
* @param <BatchResponseT> the type of an outgoing batch response.
28+
*/
29+
@SdkProtectedApi
30+
public interface BatchManager<RequestT, ResponseT, BatchResponseT> extends SdkAutoCloseable {
31+
32+
/**
33+
* Buffers outgoing requests on the client and sends them as batch requests to the service. Requests are batched together
34+
* according to a batchKey and are sent periodically to the service as determined by a configured timeout. If the
35+
* number of requests for a batchKey reaches or exceeds a configured limit, then the requests are immediately flushed
36+
* and the timeout on the periodic flush is reset.
37+
* <p>
38+
* By default, messages are batched according to a service's maximum size for a batch request. These settings can be
39+
* customized via the configuration.
40+
*
41+
* @param request the outgoing request.
42+
* @return a CompletableFuture of the corresponding response.
43+
*/
44+
CompletableFuture<ResponseT> sendRequest(RequestT request);
45+
46+
/**
47+
* Creates a newly initialized BatchManager builder object.
48+
*
49+
* @param <RequestT> the type of an outgoing request.
50+
* @param <ResponseT> the type of an outgoing response.
51+
* @param <BatchResponseT> the type of an outgoing batch response.
52+
*/
53+
static <RequestT, ResponseT, BatchResponseT> DefaultBatchManager.Builder<RequestT, ResponseT, BatchResponseT> builder(
54+
Class<? extends RequestT> requestClass, Class<? extends ResponseT> responseClass,
55+
Class<? extends BatchResponseT> batchResponseClass) {
56+
return DefaultBatchManager.builder();
57+
}
58+
59+
/**
60+
* The BatchManager Builder.
61+
*
62+
* @param <RequestT> the type of an outgoing request.
63+
* @param <ResponseT> the type of an outgoing response.
64+
* @param <BatchResponseT> the type of an outgoing batch response.
65+
*/
66+
interface Builder<RequestT, ResponseT, BatchResponseT> extends BatchManagerBuilder<RequestT, ResponseT, BatchResponseT,
67+
BatchManager.Builder<RequestT, ResponseT, BatchResponseT>> {
68+
69+
/**
70+
* An immutable object that is created from the properties that have been set on the builder.
71+
* @return a reference to this object so that method calls can be chained together.
72+
*/
73+
BatchManager<RequestT, ResponseT, BatchResponseT> build();
74+
}
75+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.batchmanager;
17+
18+
import java.util.concurrent.ScheduledExecutorService;
19+
import software.amazon.awssdk.annotations.SdkProtectedApi;
20+
import software.amazon.awssdk.core.internal.batchmanager.BatchAndSend;
21+
import software.amazon.awssdk.core.internal.batchmanager.BatchKeyMapper;
22+
import software.amazon.awssdk.core.internal.batchmanager.BatchResponseMapper;
23+
24+
@SdkProtectedApi
25+
public interface BatchManagerBuilder<RequestT, ResponseT, BatchResponseT, B> {
26+
27+
/**
28+
* Defines overrides to the default BatchManager configuration that should be used.
29+
*
30+
* @param overrideConfiguration the override configuration.
31+
* @return a reference to this object so that method calls can be chained together.
32+
*/
33+
B overrideConfiguration(BatchOverrideConfiguration overrideConfiguration);
34+
35+
/**
36+
* Adds a {@link ScheduledExecutorService} to be used by the BatchManager to schedule periodic flushes of the underlying
37+
* buffers.
38+
*
39+
* @param scheduledExecutor the provided scheduled executor.
40+
* @return a reference to this object so that method calls can be chained together.
41+
*/
42+
B scheduledExecutor(ScheduledExecutorService scheduledExecutor);
43+
44+
/**
45+
* Adds a function that defines how requests should be batched together into the appropriate batch response.
46+
*
47+
* @param batchFunction the provided function.
48+
* @return a reference to this object so that method calls can be chained together.
49+
*/
50+
B batchFunction(BatchAndSend<RequestT, BatchResponseT> batchFunction);
51+
52+
/**
53+
* Adds a function that defines how a batch response should be extracted and transformed into its individual responses.
54+
*
55+
* @param responseMapper the provided function.
56+
* @return a reference to this object so that method calls can be chained together.
57+
*/
58+
B responseMapper(BatchResponseMapper<BatchResponseT, ResponseT> responseMapper);
59+
60+
/**
61+
* Adds a function that calculates an appropriate batchKey from a given request.
62+
*
63+
* @param batchKeyMapper the provided function.
64+
* @return a reference to this object so that method calls can be chained together.
65+
*/
66+
B batchKeyMapper(BatchKeyMapper<RequestT> batchKeyMapper);
67+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/BatchOverrideConfiguration.java renamed to core/sdk-core/src/main/java/software/amazon/awssdk/core/batchmanager/BatchOverrideConfiguration.java

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,19 +13,19 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
package software.amazon.awssdk.core;
16+
package software.amazon.awssdk.core.batchmanager;
1717

1818
import java.time.Duration;
19-
import java.util.concurrent.ScheduledExecutorService;
19+
import java.util.Optional;
2020
import software.amazon.awssdk.annotations.SdkPublicApi;
21-
import software.amazon.awssdk.core.internal.batchmanager.BatchManager;
21+
import software.amazon.awssdk.core.internal.batchmanager.DefaultBatchManager;
2222
import software.amazon.awssdk.utils.ToString;
2323
import software.amazon.awssdk.utils.Validate;
2424
import software.amazon.awssdk.utils.builder.CopyableBuilder;
2525
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
2626

2727
/**
28-
* Configuration values for the {@link BatchManager}. All values are optional, and the default values will be used
28+
* Configuration values for the {@link DefaultBatchManager}. All values are optional, and the default values will be used
2929
* if they are not specified.
3030
*/
3131
@SdkPublicApi
@@ -34,14 +34,10 @@ public final class BatchOverrideConfiguration implements ToCopyableBuilder<Batch
3434

3535
private final Integer maxBatchItems;
3636
private final Duration maxBatchOpenInMs;
37-
private final ScheduledExecutorService scheduledExecutor;
3837

3938
public BatchOverrideConfiguration(Builder builder) {
40-
Validate.notNull(builder.maxBatchItems, "maxBatchItems cannot be null");
41-
this.maxBatchItems = Validate.isPositive(builder.maxBatchItems, "maxBatchItems");
42-
Validate.notNull(builder.maxBatchOpenInMs, "maxBatchOpenInMs cannot be null");
43-
this.maxBatchOpenInMs = Validate.isPositive(builder.maxBatchOpenInMs, "maxBachOpenInMs");
44-
this.scheduledExecutor = Validate.notNull(builder.scheduledExecutor, "scheduledExecutor cannot be null");
39+
this.maxBatchItems = Validate.isPositiveOrNull(builder.maxBatchItems, "maxBatchItems");
40+
this.maxBatchOpenInMs = Validate.isPositiveOrNull(builder.maxBatchOpenInMs, "maxBachOpenInMs");
4541
}
4642

4743
public static Builder builder() {
@@ -51,27 +47,22 @@ public static Builder builder() {
5147
/**
5248
* @return the optional maximum number of messages that are batched together in a single request.
5349
*/
54-
public Integer maxBatchItems() {
55-
return maxBatchItems;
50+
public Optional<Integer> maxBatchItems() {
51+
return Optional.ofNullable(maxBatchItems);
5652
}
5753

5854
/**
5955
* @return the optional maximum amount of time (in milliseconds) that an outgoing call waits to be batched with messages of
6056
* the same type.
6157
*/
62-
public Duration maxBatchOpenInMs() {
63-
return maxBatchOpenInMs;
64-
}
65-
66-
public ScheduledExecutorService scheduledExecutor() {
67-
return scheduledExecutor;
58+
public Optional<Duration> maxBatchOpenInMs() {
59+
return Optional.ofNullable(maxBatchOpenInMs);
6860
}
6961

7062
@Override
7163
public Builder toBuilder() {
7264
return new Builder().maxBatchItems(maxBatchItems)
73-
.maxBatchOpenInMs(maxBatchOpenInMs)
74-
.scheduledExecutor(scheduledExecutor);
65+
.maxBatchOpenInMs(maxBatchOpenInMs);
7566
}
7667

7768
@Override
@@ -96,44 +87,47 @@ public boolean equals(Object o) {
9687
if (maxBatchItems != null ? !maxBatchItems.equals(that.maxBatchItems) : that.maxBatchItems != null) {
9788
return false;
9889
}
99-
if (maxBatchOpenInMs != null ? !maxBatchOpenInMs.equals(that.maxBatchOpenInMs) : that.maxBatchOpenInMs != null) {
100-
return false;
101-
}
102-
return scheduledExecutor.equals(that.scheduledExecutor);
90+
return maxBatchOpenInMs != null ? maxBatchOpenInMs.equals(that.maxBatchOpenInMs) : that.maxBatchOpenInMs == null;
10391
}
10492

10593
@Override
10694
public int hashCode() {
10795
int result = maxBatchItems != null ? maxBatchItems.hashCode() : 0;
10896
result = 31 * result + (maxBatchOpenInMs != null ? maxBatchOpenInMs.hashCode() : 0);
109-
result = 31 * result + scheduledExecutor.hashCode();
11097
return result;
11198
}
11299

113100
public static final class Builder implements CopyableBuilder<Builder, BatchOverrideConfiguration> {
114101

115102
private Integer maxBatchItems;
116103
private Duration maxBatchOpenInMs;
117-
private ScheduledExecutorService scheduledExecutor;
118104

119105
private Builder() {
120106
}
121107

108+
/**
109+
* Define the the maximum number of messages that are batched together in a single request.
110+
*
111+
* @param maxBatchItems The new maxBatchItems value.
112+
* @return This object for method chaining.
113+
*/
122114
public Builder maxBatchItems(Integer maxBatchItems) {
123115
this.maxBatchItems = maxBatchItems;
124116
return this;
125117
}
126118

119+
/**
120+
* The maximum amount of time (in milliseconds) that an outgoing call waits for other requests before sending out a batch
121+
* request.
122+
*
123+
* @param maxBatchOpenInMs The new maxBatchOpenInMs value.
124+
* @return This object for method chaining.
125+
*/
127126
public Builder maxBatchOpenInMs(Duration maxBatchOpenInMs) {
128127
this.maxBatchOpenInMs = maxBatchOpenInMs;
129128
return this;
130129
}
131130

132-
public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
133-
this.scheduledExecutor = scheduledExecutor;
134-
return this;
135-
}
136-
137131
public BatchOverrideConfiguration build() {
138132
return new BatchOverrideConfiguration(this);
139133
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.batchmanager;
17+
18+
import java.time.Duration;
19+
import java.util.Optional;
20+
import software.amazon.awssdk.annotations.SdkInternalApi;
21+
import software.amazon.awssdk.core.batchmanager.BatchOverrideConfiguration;
22+
23+
@SdkInternalApi
24+
public final class BatchConfiguration {
25+
26+
// TODO: Update these default values.
27+
private static final int DEFAULT_MAX_BATCH_ITEMS = 5;
28+
private static final Duration DEFAULT_MAX_BATCH_OPEN_IN_MS = Duration.ofMillis(200);
29+
private final Integer maxBatchItems;
30+
private final Duration maxBatchOpenInMs;
31+
32+
public BatchConfiguration(BatchOverrideConfiguration overrideConfiguration) {
33+
Optional<BatchOverrideConfiguration> configuration = Optional.ofNullable(overrideConfiguration);
34+
this.maxBatchItems = configuration.flatMap(BatchOverrideConfiguration::maxBatchItems).orElse(DEFAULT_MAX_BATCH_ITEMS);
35+
this.maxBatchOpenInMs = configuration.flatMap(BatchOverrideConfiguration::maxBatchOpenInMs)
36+
.orElse(DEFAULT_MAX_BATCH_OPEN_IN_MS);
37+
}
38+
39+
public Duration maxBatchOpenInMs() {
40+
return maxBatchOpenInMs;
41+
}
42+
43+
public int maxBatchItems() {
44+
return maxBatchItems;
45+
}
46+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/batchmanager/BatchKeyMapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import software.amazon.awssdk.annotations.SdkProtectedApi;
1919

2020
/**
21-
* Takes a request and extracts a batchGroupId as determined by the caller.
21+
* Takes a request and extracts a batchKey as determined by the caller.
2222
* TODO: For right now, the batchKey is a String but this may change as needed in the future.
2323
* @param <RequestT> the request.
2424
*/

0 commit comments

Comments
 (0)