Skip to content

Commit 329566c

Browse files
16lim21Michael Li
andauthored
Implementing BatchManager in sdk-core (#2613)
* Adding batchBuffer and testing code * Adding sdk-core tests and fixing request to response mapping * Updating sdk-core tests with object to mimic batch request entry * Adding wrapper classes for functions * Making some changes for thread safety and updating tests * Refactoring code and how requests are cleared. Simplifying scheduling code as well * Changing scheduled flush code. Need to test in next commit * Adding more test cases and scheduling periodically instead of just once * Refactoring responseMap into a wrapper class * Fixing issues related to periodic scheduling. Need to test with multithreading * Using map of currentIds instead of a singular currentId. * Fixed race conditions related to cancelling scheduled buffer flush * Resetting cancellableFlush's flags after flush is completed * Multithreading test works. need to clean up code and recommit * Found another way to reset cancellableFlush flags. Also cleaning up code * Updating request batching tests with multi threading tests * Renamed to batchManager and cleaned up variable names to be more self-explanatory. Also refactored sdk-core tests * Refactored sdk-core tests * Fixing checkstyle issues * Fixing checkstyle issues in the tests as well as refactoring some tests * Updating and adding SQS batching tests and removing xml.bind * Using existing Md5Utils for Sqs batch tests * Naming and style changes to address PR comments * Refactoring tests to use maps instead of arrays for better request-to-response correlation checking * Name changes, refactoring code and addressing minor PR comments * Refactoring SQS integration tests to use identifiable responses * Combining request and response maps into one BatchingMap * Renaming generics as RequestT, ResponseT etc. * Refactoring tests to calculate batchGroupId from request and removing need for requestBufferCopy * Cleaning up names and methods * Adding the batchGroupId function that I forgot last time * Refactoring batchManager to use builder pattern and an overrideConfiguration to manage configuration values * Removing need for separate executor (just use scheduled executor) * Adding javadocs and renaming variables/classes to be more intuitive * Refactoring incrementing currentId to a separate BatchUtils class * Refactoring currentIds map and scheduledFlushes map into the BatchBuffer * Removing need to check if flush has executed but need to check if flush was a manual flush. Also adding batchUtils * Small changes to address github PR comments * Removing completionService and just using CompletableFuture.supplyAsync * Adding code to cover exception thrown from the sendAndBatch function * Adding exception test and fixing batchingExcution locking * Adding lock object back into batchingExecutionContext for both read and remove request methods * Addressing minor github PR comments (making classes final, cleaning up names etc.) * Just renaming cancelScheduledFlushIfNeeded method Co-authored-by: Michael Li <[email protected]>
1 parent d08a5bd commit 329566c

File tree

15 files changed

+1611
-0
lines changed

15 files changed

+1611
-0
lines changed
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core;
17+
18+
import java.time.Duration;
19+
import java.util.concurrent.ScheduledExecutorService;
20+
import software.amazon.awssdk.annotations.SdkPublicApi;
21+
import software.amazon.awssdk.core.internal.batchutilities.BatchManager;
22+
import software.amazon.awssdk.utils.ToString;
23+
import software.amazon.awssdk.utils.Validate;
24+
import software.amazon.awssdk.utils.builder.CopyableBuilder;
25+
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
26+
27+
/**
28+
* Configuration values for the {@link BatchManager}. All values are optional, and the default values will be used
29+
* if they are not specified.
30+
*/
31+
@SdkPublicApi
32+
public final class BatchOverrideConfiguration implements ToCopyableBuilder<BatchOverrideConfiguration.Builder,
33+
BatchOverrideConfiguration> {
34+
35+
private final Integer maxBatchItems;
36+
private final Duration maxBatchOpenInMs;
37+
private final ScheduledExecutorService scheduledExecutor;
38+
39+
public BatchOverrideConfiguration(Builder builder) {
40+
Validate.notNull(builder.maxBatchItems, "maxBatchItems cannot be null");
41+
this.maxBatchItems = Validate.isPositive(builder.maxBatchItems, "maxBatchItems");
42+
Validate.notNull(builder.maxBatchOpenInMs, "maxBatchOpenInMs cannot be null");
43+
this.maxBatchOpenInMs = Validate.isPositive(builder.maxBatchOpenInMs, "maxBachOpenInMs");
44+
this.scheduledExecutor = Validate.notNull(builder.scheduledExecutor, "scheduledExecutor cannot be null");
45+
}
46+
47+
public static Builder builder() {
48+
return new Builder();
49+
}
50+
51+
/**
52+
* @return the optional maximum number of messages that are batched together in a single request.
53+
*/
54+
public Integer maxBatchItems() {
55+
return maxBatchItems;
56+
}
57+
58+
/**
59+
* @return the optional maximum amount of time (in milliseconds) that an outgoing call waits to be batched with messages of
60+
* the same type.
61+
*/
62+
public Duration maxBatchOpenInMs() {
63+
return maxBatchOpenInMs;
64+
}
65+
66+
public ScheduledExecutorService scheduledExecutor() {
67+
return scheduledExecutor;
68+
}
69+
70+
@Override
71+
public Builder toBuilder() {
72+
return new Builder().maxBatchItems(maxBatchItems)
73+
.maxBatchOpenInMs(maxBatchOpenInMs)
74+
.scheduledExecutor(scheduledExecutor);
75+
}
76+
77+
@Override
78+
public String toString() {
79+
return ToString.builder("BatchOverrideConfiguration")
80+
.add("maxBatchItems", maxBatchItems)
81+
.add("maxBatchOpenInMs", maxBatchOpenInMs)
82+
.build();
83+
}
84+
85+
public static final class Builder implements CopyableBuilder<Builder, BatchOverrideConfiguration> {
86+
87+
private Integer maxBatchItems;
88+
private Duration maxBatchOpenInMs;
89+
private ScheduledExecutorService scheduledExecutor;
90+
91+
private Builder() {
92+
}
93+
94+
public Builder maxBatchItems(Integer maxBatchItems) {
95+
this.maxBatchItems = maxBatchItems;
96+
return this;
97+
}
98+
99+
public Builder maxBatchOpenInMs(Duration maxBatchOpenInMs) {
100+
this.maxBatchOpenInMs = maxBatchOpenInMs;
101+
return this;
102+
}
103+
104+
public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
105+
this.scheduledExecutor = scheduledExecutor;
106+
return this;
107+
}
108+
109+
public BatchOverrideConfiguration build() {
110+
return new BatchOverrideConfiguration(this);
111+
}
112+
}
113+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.batchutilities;
17+
18+
import java.util.List;
19+
import java.util.concurrent.CompletableFuture;
20+
import software.amazon.awssdk.annotations.SdkProtectedApi;
21+
22+
/**
23+
* Takes a list of identified requests in addition to a destination and batches the requests into a batch request.
24+
* It then sends the batch request and returns a CompletableFuture of the response.
25+
* @param <RequestT> the type of an outgoing request.
26+
* @param <BatchResponseT> the type of an outgoing batch response.
27+
*/
28+
@FunctionalInterface
29+
@SdkProtectedApi
30+
public interface BatchAndSend<RequestT, BatchResponseT> {
31+
CompletableFuture<BatchResponseT> batchAndSend(List<IdentifiableRequest<RequestT>> identifiedRequests, String batchGroupId);
32+
}
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.batchutilities;
17+
18+
import java.util.Collection;
19+
import java.util.Map;
20+
import java.util.Set;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.function.BiConsumer;
25+
import java.util.stream.Collectors;
26+
import software.amazon.awssdk.annotations.SdkInternalApi;
27+
28+
@SdkInternalApi
29+
public final class BatchBuffer<RequestT, ResponseT> {
30+
private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;
31+
private final AtomicInteger numRequests;
32+
33+
/**
34+
* Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next
35+
* BatchingExecutionContext. For simplicity, the ID is just an integer that is incremented everytime a new request and
36+
* response pair is received.
37+
*/
38+
private final AtomicInteger nextId;
39+
40+
/**
41+
* The scheduled flush tasks associated with this batchBuffer.
42+
*/
43+
private ScheduledFlush scheduledFlush;
44+
45+
public BatchBuffer(ScheduledFlush scheduledFlush) {
46+
this.idToBatchContext = new ConcurrentHashMap<>();
47+
this.numRequests = new AtomicInteger(0);
48+
this.nextId = new AtomicInteger(0);
49+
this.scheduledFlush = scheduledFlush;
50+
}
51+
52+
public int size() {
53+
return idToBatchContext.size();
54+
}
55+
56+
public int requestSize() {
57+
return numRequests.get();
58+
}
59+
60+
public boolean hasRequests() {
61+
return numRequests.get() != 0;
62+
}
63+
64+
public boolean hasResponses() {
65+
return !idToBatchContext.isEmpty();
66+
}
67+
68+
public boolean containsKey(String key) {
69+
return idToBatchContext.containsKey(key);
70+
}
71+
72+
public RequestT getRequest(String key) {
73+
return idToBatchContext.get(key).request();
74+
}
75+
76+
public CompletableFuture<ResponseT> getResponse(String key) {
77+
return idToBatchContext.get(key).response();
78+
}
79+
80+
public ScheduledFlush getScheduledFlush() {
81+
return scheduledFlush;
82+
}
83+
84+
public BatchingExecutionContext<RequestT, ResponseT> put(RequestT request, CompletableFuture<ResponseT> response) {
85+
numRequests.getAndIncrement();
86+
String id = BatchUtils.getAndIncrementId(nextId);
87+
return idToBatchContext.put(id, new BatchingExecutionContext<>(request, response));
88+
}
89+
90+
public void putScheduledFlush(ScheduledFlush scheduledFlush) {
91+
this.scheduledFlush = scheduledFlush;
92+
}
93+
94+
public void cancelScheduledFlush() {
95+
scheduledFlush.cancel();
96+
}
97+
98+
public void removeRequest(String key) {
99+
if (idToBatchContext.get(key).removeRequest()) {
100+
numRequests.getAndDecrement();
101+
}
102+
}
103+
104+
public BatchingExecutionContext<RequestT, ResponseT> remove(String key) {
105+
return idToBatchContext.remove(key);
106+
}
107+
108+
public Collection<BatchingExecutionContext<RequestT, ResponseT>> values() {
109+
return idToBatchContext.values();
110+
}
111+
112+
public Collection<CompletableFuture<ResponseT>> responses() {
113+
return idToBatchContext.values()
114+
.stream()
115+
.map(BatchingExecutionContext::response)
116+
.collect(Collectors.toList());
117+
}
118+
119+
public Set<Map.Entry<String, BatchingExecutionContext<RequestT, ResponseT>>> entrySet() {
120+
return idToBatchContext.entrySet();
121+
}
122+
123+
public void clear() {
124+
numRequests.set(0);
125+
idToBatchContext.clear();
126+
}
127+
128+
public void forEach(BiConsumer<String, BatchingExecutionContext<RequestT, ResponseT>> action) {
129+
idToBatchContext.forEach(action);
130+
}
131+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.batchutilities;
17+
18+
import software.amazon.awssdk.annotations.SdkProtectedApi;
19+
20+
/**
21+
* Takes a request and extracts a batchGroupId as determined by the caller.
22+
* TODO: For right now, the batchKey is a String but this may change as needed in the future.
23+
* @param <RequestT> the request.
24+
*/
25+
@FunctionalInterface
26+
@SdkProtectedApi
27+
public interface BatchKeyMapper<RequestT> {
28+
String getBatchKey(RequestT request);
29+
}

0 commit comments

Comments
 (0)