Skip to content

Commit 00b2efc

Browse files
16lim21Michael Li
andauthored
Fixed issue relating to cancelling scheduled flush and removing requests. (#2636)
* Adding batchBuffer and testing code * Adding sdk-core tests and fixing request to response mapping * Updating sdk-core tests with object to mimic batch request entry * Adding wrapper classes for functions * Making some changes for thread safety and updating tests * Refactoring code and how requests are cleared. Simplifying scheduling code as well * Changing scheduled flush code. Need to test in next commit * Adding more test cases and scheduling periodically instead of just once * Refactoring responseMap into a wrapper class * Fixing issues related to periodic scheduling. Need to test with multithreading * Using map of currentIds instead of a singular currentId. * Fixed race conditions related to cancelling scheduled buffer flush * Resetting cancellableFlush's flags after flush is completed * Multithreading test works. need to clean up code and recommit * Found another way to reset cancellableFlush flags. Also cleaning up code * Updating request batching tests with multi threading tests * Renamed to batchManager and cleaned up variable names to be more self-explanatory. Also refactored sdk-core tests * Refactored sdk-core tests * Fixing checkstyle issues * Fixing checkstyle issues in the tests as well as refactoring some tests * Updating and adding SQS batching tests and removing xml.bind * Using existing Md5Utils for Sqs batch tests * Naming and style changes to address PR comments * Refactoring tests to use maps instead of arrays for better request-to-response correlation checking * Name changes, refactoring code and addressing minor PR comments * Refactoring SQS integration tests to use identifiable responses * Combining request and response maps into one BatchingMap * Renaming generics as RequestT, ResponseT etc. * Refactoring tests to calculate batchGroupId from request and removing need for requestBufferCopy * Cleaning up names and methods * Adding the batchGroupId function that I forgot last time * Refactoring batchManager to use builder pattern and an overrideConfiguration to manage configuration values * Removing need for separate executor (just use scheduled executor) * Adding javadocs and renaming variables/classes to be more intuitive * Refactoring incrementing currentId to a separate BatchUtils class * Refactoring currentIds map and scheduledFlushes map into the BatchBuffer * Removing need to check if flush has executed but need to check if flush was a manual flush. Also adding batchUtils * Small changes to address github PR comments * Removing completionService and just using CompletableFuture.supplyAsync * Adding code to cover exception thrown from the sendAndBatch function * Adding exception test and fixing batchingExcution locking * Adding lock object back into batchingExecutionContext for both read and remove request methods * Addressing minor github PR comments (making classes final, cleaning up names etc.) * Just renaming cancelScheduledFlushIfNeeded method * Removing isManual * Fixing minor issues in tests * Adding clear() method to batchingMap and toString method for debugging to IdentifiableRequest * Hacky fix to removing requests properly (before was setting request as null but was not very efficient) * Adding tests for batchutils, IdentifiableRequest/Response, scheduledFlushes * Adding batchOverrideConfiguration tests and integer overflow batchUtils test * Removing unnecessary methods in other classes * Removing IdentifiableResponses and using IdentifiableMessage instead * Adding more IdentifiableMessage test and updating SQSIntegrationTest to use IdentifiableMessage * Renaming batchutilities package to batchmanager * Exposed APIs in BatchingMap so nothing interacts with batchBuffer directly * Pass class types to builder instead of casting it * Adding a test to periodically send messages between multiple threads * Removing requests immediately when batchBuffer can be flushed which is more intuitive * Cleaning up batchBuffer and batchingMap after previous change * Removing need for a cancellableFlush since requests are removed in a synchronized block in batchBuffer * Removing logging and changing methods to return Map instead of LinkedHashMap * Addressing half of the PR comments. Have questions on the rest * Adding hashCode and equals method to batchOverrideConfiguration and testing it. * Changing how nextBatchEntry works to be more intuitive. This removes need for a separate BatchUtils * Fixing up TODOs * Fixing checkstyle issue Co-authored-by: Michael Li <[email protected]>
1 parent 329566c commit 00b2efc

File tree

20 files changed

+596
-571
lines changed

20 files changed

+596
-571
lines changed

core/sdk-core/src/main/java/software/amazon/awssdk/core/BatchOverrideConfiguration.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import java.time.Duration;
1919
import java.util.concurrent.ScheduledExecutorService;
2020
import software.amazon.awssdk.annotations.SdkPublicApi;
21-
import software.amazon.awssdk.core.internal.batchutilities.BatchManager;
21+
import software.amazon.awssdk.core.internal.batchmanager.BatchManager;
2222
import software.amazon.awssdk.utils.ToString;
2323
import software.amazon.awssdk.utils.Validate;
2424
import software.amazon.awssdk.utils.builder.CopyableBuilder;
@@ -78,10 +78,38 @@ public Builder toBuilder() {
7878
public String toString() {
7979
return ToString.builder("BatchOverrideConfiguration")
8080
.add("maxBatchItems", maxBatchItems)
81-
.add("maxBatchOpenInMs", maxBatchOpenInMs)
81+
.add("maxBatchOpenInMs", maxBatchOpenInMs.toMillis())
8282
.build();
8383
}
8484

85+
@Override
86+
public boolean equals(Object o) {
87+
if (this == o) {
88+
return true;
89+
}
90+
if (o == null || getClass() != o.getClass()) {
91+
return false;
92+
}
93+
94+
BatchOverrideConfiguration that = (BatchOverrideConfiguration) o;
95+
96+
if (maxBatchItems != null ? !maxBatchItems.equals(that.maxBatchItems) : that.maxBatchItems != null) {
97+
return false;
98+
}
99+
if (maxBatchOpenInMs != null ? !maxBatchOpenInMs.equals(that.maxBatchOpenInMs) : that.maxBatchOpenInMs != null) {
100+
return false;
101+
}
102+
return scheduledExecutor.equals(that.scheduledExecutor);
103+
}
104+
105+
@Override
106+
public int hashCode() {
107+
int result = maxBatchItems != null ? maxBatchItems.hashCode() : 0;
108+
result = 31 * result + (maxBatchOpenInMs != null ? maxBatchOpenInMs.hashCode() : 0);
109+
result = 31 * result + scheduledExecutor.hashCode();
110+
return result;
111+
}
112+
85113
public static final class Builder implements CopyableBuilder<Builder, BatchOverrideConfiguration> {
86114

87115
private Integer maxBatchItems;

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/batchutilities/BatchAndSend.java renamed to core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/batchmanager/BatchAndSend.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
package software.amazon.awssdk.core.internal.batchutilities;
16+
package software.amazon.awssdk.core.internal.batchmanager;
1717

1818
import java.util.List;
1919
import java.util.concurrent.CompletableFuture;
@@ -28,5 +28,5 @@
2828
@FunctionalInterface
2929
@SdkProtectedApi
3030
public interface BatchAndSend<RequestT, BatchResponseT> {
31-
CompletableFuture<BatchResponseT> batchAndSend(List<IdentifiableRequest<RequestT>> identifiedRequests, String batchGroupId);
31+
CompletableFuture<BatchResponseT> batchAndSend(List<IdentifiableMessage<RequestT>> identifiedRequests, String batchGroupId);
3232
}
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.internal.batchmanager;
17+
18+
import java.util.Collection;
19+
import java.util.LinkedHashMap;
20+
import java.util.Map;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.ConcurrentHashMap;
23+
import java.util.concurrent.ScheduledFuture;
24+
import java.util.stream.Collectors;
25+
import software.amazon.awssdk.annotations.SdkInternalApi;
26+
27+
@SdkInternalApi
28+
public final class BatchBuffer<RequestT, ResponseT> {
29+
private final Object flushLock = new Object();
30+
31+
private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;
32+
33+
// TODO: Figure out better name for nextId and nextBatchEntry.
34+
/**
35+
* Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next
36+
* BatchingExecutionContext. For simplicity, the ID is just an integer that is incremented everytime a new request and
37+
* response pair is received.
38+
*/
39+
private int nextId;
40+
41+
/**
42+
* Keeps track of the ID of the next entry to be added in a batch request. This ID does not necessarily correlate to a
43+
* request that already exists in the idToBatchContext map since it refers to the next entry (ex. if the last entry added
44+
* to idToBatchContext had an id of 22, nextBatchEntry will have a value of 23).
45+
*/
46+
private int nextBatchEntry;
47+
48+
/**
49+
* The scheduled flush tasks associated with this batchBuffer.
50+
*/
51+
private ScheduledFuture<?> scheduledFlush;
52+
53+
public BatchBuffer(ScheduledFuture<?> scheduledFlush) {
54+
this.idToBatchContext = new ConcurrentHashMap<>();
55+
this.nextId = 0;
56+
this.nextBatchEntry = 0;
57+
this.scheduledFlush = scheduledFlush;
58+
}
59+
60+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(int maxBatchItems) {
61+
synchronized (flushLock) {
62+
if (idToBatchContext.size() >= maxBatchItems) {
63+
return extractFlushedEntries(maxBatchItems);
64+
}
65+
return new ConcurrentHashMap<>();
66+
}
67+
}
68+
69+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(int maxBatchItems) {
70+
synchronized (flushLock) {
71+
if (idToBatchContext.size() > 0) {
72+
return extractFlushedEntries(maxBatchItems);
73+
}
74+
return new ConcurrentHashMap<>();
75+
}
76+
}
77+
78+
private Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractFlushedEntries(int maxBatchItems) {
79+
LinkedHashMap<String, BatchingExecutionContext<RequestT, ResponseT>> requestEntries = new LinkedHashMap<>();
80+
String nextEntry;
81+
while (requestEntries.size() < maxBatchItems && hasNextBatchEntry()) {
82+
nextEntry = nextBatchEntry();
83+
requestEntries.put(nextEntry, idToBatchContext.get(nextEntry));
84+
idToBatchContext.remove(nextEntry);
85+
}
86+
return requestEntries;
87+
}
88+
89+
public RequestT getRequest(String key) {
90+
return idToBatchContext.get(key).request();
91+
}
92+
93+
public CompletableFuture<ResponseT> getResponse(String key) {
94+
return idToBatchContext.get(key).response();
95+
}
96+
97+
public BatchingExecutionContext<RequestT, ResponseT> put(RequestT request, CompletableFuture<ResponseT> response) {
98+
synchronized (this) {
99+
if (nextId == Integer.MAX_VALUE) {
100+
nextId = 0;
101+
}
102+
String id = Integer.toString(nextId++);
103+
return idToBatchContext.put(id, new BatchingExecutionContext<>(request, response));
104+
}
105+
}
106+
107+
private boolean hasNextBatchEntry() {
108+
return idToBatchContext.containsKey(Integer.toString(nextBatchEntry));
109+
}
110+
111+
private String nextBatchEntry() {
112+
if (nextBatchEntry == Integer.MAX_VALUE) {
113+
nextBatchEntry = 0;
114+
}
115+
return Integer.toString(nextBatchEntry++);
116+
}
117+
118+
public void putScheduledFlush(ScheduledFuture<?> scheduledFlush) {
119+
this.scheduledFlush = scheduledFlush;
120+
}
121+
122+
public void cancelScheduledFlush() {
123+
scheduledFlush.cancel(false);
124+
}
125+
126+
public Collection<CompletableFuture<ResponseT>> responses() {
127+
return idToBatchContext.values()
128+
.stream()
129+
.map(BatchingExecutionContext::response)
130+
.collect(Collectors.toList());
131+
}
132+
133+
public void clear() {
134+
idToBatchContext.clear();
135+
}
136+
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
* permissions and limitations under the License.
1414
*/
1515

16-
package software.amazon.awssdk.core.internal.batchutilities;
16+
package software.amazon.awssdk.core.internal.batchmanager;
1717

1818
import software.amazon.awssdk.annotations.SdkProtectedApi;
1919

0 commit comments

Comments
 (0)