Skip to content

Fixed issue relating to cancelling scheduled flush and removing requests. #2636

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
8abe56f
Adding batchBuffer and testing code
Jul 13, 2021
55d6849
Adding sdk-core tests and fixing request to response mapping
Jul 14, 2021
f6e485a
Updating sdk-core tests with object to mimic batch request entry
Jul 14, 2021
fa01f37
Adding wrapper classes for functions
Jul 14, 2021
7a2c0b9
Making some changes for thread safety and updating tests
Jul 15, 2021
be0d218
Refactoring code and how requests are cleared. Simplifying scheduling…
Jul 15, 2021
d89d35f
Changing scheduled flush code. Need to test in next commit
Jul 15, 2021
7191141
Adding more test cases and scheduling periodically instead of just once
Jul 15, 2021
6efeb64
Refactoring responseMap into a wrapper class
Jul 15, 2021
aec0d5c
Fixing issues related to periodic scheduling. Need to test with multi…
Jul 19, 2021
db83454
Using map of currentIds instead of a singular currentId.
Jul 19, 2021
2c55e01
Fixed race conditions related to cancelling scheduled buffer flush
Jul 20, 2021
4b77e0b
Resetting cancellableFlush's flags after flush is completed
Jul 20, 2021
fe6950a
Multithreading test works. need to clean up code and recommit
Jul 20, 2021
1e3d0d8
Found another way to reset cancellableFlush flags. Also cleaning up code
Jul 20, 2021
ec2f127
Updating request batching tests with multi threading tests
Jul 20, 2021
1f9bf88
Renamed to batchManager and cleaned up variable names to be more self…
Jul 21, 2021
b11b1b9
Refactored sdk-core tests
Jul 21, 2021
581a620
Fixing checkstyle issues
Jul 21, 2021
043619d
Fixing checkstyle issues in the tests as well as refactoring some tests
Jul 21, 2021
102424b
Updating and adding SQS batching tests and removing xml.bind
Jul 21, 2021
4b6ea92
Using existing Md5Utils for Sqs batch tests
Jul 21, 2021
d1efa49
Naming and style changes to address PR comments
Jul 22, 2021
4b4cb85
Refactoring tests to use maps instead of arrays for better request-to…
Jul 22, 2021
2528a65
Name changes, refactoring code and addressing minor PR comments
Jul 22, 2021
d76a4ff
Refactoring SQS integration tests to use identifiable responses
Jul 23, 2021
cff589a
Combining request and response maps into one BatchingMap
Jul 23, 2021
e8089e6
Renaming generics as RequestT, ResponseT etc.
Jul 23, 2021
f478261
Refactoring tests to calculate batchGroupId from request and removing…
Jul 23, 2021
cbfc34a
Cleaning up names and methods
Jul 26, 2021
5c94dd2
Adding the batchGroupId function that I forgot last time
Jul 26, 2021
915be6f
Refactoring batchManager to use builder pattern and an overrideConfig…
Jul 26, 2021
e387b21
Removing need for separate executor (just use scheduled executor)
Jul 26, 2021
0b17fcb
Adding javadocs and renaming variables/classes to be more intuitive
Jul 26, 2021
7849b42
Refactoring incrementing currentId to a separate BatchUtils class
Jul 26, 2021
660803b
Refactoring currentIds map and scheduledFlushes map into the BatchBuffer
Jul 26, 2021
d9a40c3
Removing need to check if flush has executed but need to check if flu…
Jul 26, 2021
826fab3
Small changes to address github PR comments
Jul 27, 2021
424dd62
Removing completionService and just using CompletableFuture.supplyAsync
Jul 27, 2021
d2725ec
Adding code to cover exception thrown from the sendAndBatch function
Jul 27, 2021
89d55f1
Adding exception test and fixing batchingExcution locking
Jul 27, 2021
137b340
Adding lock object back into batchingExecutionContext for both read a…
Jul 27, 2021
7ac8916
Addressing minor github PR comments (making classes final, cleaning u…
Jul 28, 2021
52e6316
Just renaming cancelScheduledFlushIfNeeded method
Jul 28, 2021
fcee2ad
Removing isManual
Jul 29, 2021
a752923
Fixing minor issues in tests
Jul 29, 2021
b7c1ad5
Adding clear() method to batchingMap and toString method for debuggin…
Jul 29, 2021
f5f2394
Hacky fix to removing requests properly (before was setting request a…
Jul 29, 2021
9c9082c
Adding tests for batchutils, IdentifiableRequest/Response, scheduledF…
Jul 30, 2021
5f7a187
Adding batchOverrideConfiguration tests and integer overflow batchUti…
Jul 30, 2021
841cc5d
Removing unnecessary methods in other classes
Jul 30, 2021
333ada6
Removing IdentifiableResponses and using IdentifiableMessage instead
Jul 30, 2021
a13312b
Adding more IdentifiableMessage test and updating SQSIntegrationTest …
Jul 30, 2021
3d65b2c
Renaming batchutilities package to batchmanager
Jul 30, 2021
666a6e4
Exposed APIs in BatchingMap so nothing interacts with batchBuffer dir…
Jul 30, 2021
3300f8e
Pass class types to builder instead of casting it
Aug 2, 2021
7302e5f
Adding a test to periodically send messages between multiple threads
Aug 2, 2021
d2581be
Removing requests immediately when batchBuffer can be flushed which i…
Aug 2, 2021
c676fbf
Cleaning up batchBuffer and batchingMap after previous change
Aug 2, 2021
b0257cb
Removing need for a cancellableFlush since requests are removed in a …
Aug 2, 2021
3b2508d
Removing logging and changing methods to return Map instead of Linked…
Aug 2, 2021
a7965d7
Merging changes and fixing checkstyle issue
Aug 3, 2021
3c8d25c
Addressing half of the PR comments. Have questions on the rest
Aug 4, 2021
81248cf
Adding hashCode and equals method to batchOverrideConfiguration and t…
Aug 4, 2021
3436043
Changing how nextBatchEntry works to be more intuitive. This removes …
Aug 4, 2021
e3d70fc
Fixing up TODOs
Aug 4, 2021
400f832
Fixing checkstyle issue
Aug 4, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.internal.batchutilities.BatchManager;
import software.amazon.awssdk.core.internal.batchmanager.BatchManager;
import software.amazon.awssdk.utils.ToString;
import software.amazon.awssdk.utils.Validate;
import software.amazon.awssdk.utils.builder.CopyableBuilder;
Expand Down Expand Up @@ -78,10 +78,38 @@ public Builder toBuilder() {
public String toString() {
return ToString.builder("BatchOverrideConfiguration")
.add("maxBatchItems", maxBatchItems)
.add("maxBatchOpenInMs", maxBatchOpenInMs)
.add("maxBatchOpenInMs", maxBatchOpenInMs.toMillis())
.build();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

BatchOverrideConfiguration that = (BatchOverrideConfiguration) o;

if (maxBatchItems != null ? !maxBatchItems.equals(that.maxBatchItems) : that.maxBatchItems != null) {
return false;
}
if (maxBatchOpenInMs != null ? !maxBatchOpenInMs.equals(that.maxBatchOpenInMs) : that.maxBatchOpenInMs != null) {
return false;
}
return scheduledExecutor.equals(that.scheduledExecutor);
}

@Override
public int hashCode() {
int result = maxBatchItems != null ? maxBatchItems.hashCode() : 0;
result = 31 * result + (maxBatchOpenInMs != null ? maxBatchOpenInMs.hashCode() : 0);
result = 31 * result + scheduledExecutor.hashCode();
return result;
}

public static final class Builder implements CopyableBuilder<Builder, BatchOverrideConfiguration> {

private Integer maxBatchItems;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;
package software.amazon.awssdk.core.internal.batchmanager;

import java.util.List;
import java.util.concurrent.CompletableFuture;
Expand All @@ -28,5 +28,5 @@
@FunctionalInterface
@SdkProtectedApi
public interface BatchAndSend<RequestT, BatchResponseT> {
CompletableFuture<BatchResponseT> batchAndSend(List<IdentifiableRequest<RequestT>> identifiedRequests, String batchGroupId);
CompletableFuture<BatchResponseT> batchAndSend(List<IdentifiableMessage<RequestT>> identifiedRequests, String batchGroupId);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchmanager;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.stream.Collectors;
import software.amazon.awssdk.annotations.SdkInternalApi;

@SdkInternalApi
public final class BatchBuffer<RequestT, ResponseT> {
private final Object flushLock = new Object();

private final Map<String, BatchingExecutionContext<RequestT, ResponseT>> idToBatchContext;

// TODO: Figure out better name for nextId and nextBatchEntry.
/**
* Batch entries in a batch request require a unique ID so nextId keeps track of the ID to assign to the next
* BatchingExecutionContext. For simplicity, the ID is just an integer that is incremented everytime a new request and
* response pair is received.
*/
private int nextId;

/**
* Keeps track of the ID of the next entry to be added in a batch request. This ID does not necessarily correlate to a
* request that already exists in the idToBatchContext map since it refers to the next entry (ex. if the last entry added
* to idToBatchContext had an id of 22, nextBatchEntry will have a value of 23).
*/
private int nextBatchEntry;

/**
* The scheduled flush tasks associated with this batchBuffer.
*/
private ScheduledFuture<?> scheduledFlush;

public BatchBuffer(ScheduledFuture<?> scheduledFlush) {
this.idToBatchContext = new ConcurrentHashMap<>();
this.nextId = 0;
this.nextBatchEntry = 0;
this.scheduledFlush = scheduledFlush;
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(int maxBatchItems) {
synchronized (flushLock) {
if (idToBatchContext.size() >= maxBatchItems) {
return extractFlushedEntries(maxBatchItems);
}
return new ConcurrentHashMap<>();
}
}

public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(int maxBatchItems) {
synchronized (flushLock) {
if (idToBatchContext.size() > 0) {
return extractFlushedEntries(maxBatchItems);
}
return new ConcurrentHashMap<>();
}
}

private Map<String, BatchingExecutionContext<RequestT, ResponseT>> extractFlushedEntries(int maxBatchItems) {
LinkedHashMap<String, BatchingExecutionContext<RequestT, ResponseT>> requestEntries = new LinkedHashMap<>();
String nextEntry;
while (requestEntries.size() < maxBatchItems && hasNextBatchEntry()) {
nextEntry = nextBatchEntry();
requestEntries.put(nextEntry, idToBatchContext.get(nextEntry));
idToBatchContext.remove(nextEntry);
}
return requestEntries;
}

public RequestT getRequest(String key) {
return idToBatchContext.get(key).request();
}

public CompletableFuture<ResponseT> getResponse(String key) {
return idToBatchContext.get(key).response();
}

public BatchingExecutionContext<RequestT, ResponseT> put(RequestT request, CompletableFuture<ResponseT> response) {
synchronized (this) {
if (nextId == Integer.MAX_VALUE) {
nextId = 0;
}
String id = Integer.toString(nextId++);
return idToBatchContext.put(id, new BatchingExecutionContext<>(request, response));
}
}

private boolean hasNextBatchEntry() {
return idToBatchContext.containsKey(Integer.toString(nextBatchEntry));
}

private String nextBatchEntry() {
if (nextBatchEntry == Integer.MAX_VALUE) {
nextBatchEntry = 0;
}
return Integer.toString(nextBatchEntry++);
}

public void putScheduledFlush(ScheduledFuture<?> scheduledFlush) {
this.scheduledFlush = scheduledFlush;
}

public void cancelScheduledFlush() {
scheduledFlush.cancel(false);
}

public Collection<CompletableFuture<ResponseT>> responses() {
return idToBatchContext.values()
.stream()
.map(BatchingExecutionContext::response)
.collect(Collectors.toList());
}

public void clear() {
idToBatchContext.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.batchutilities;
package software.amazon.awssdk.core.internal.batchmanager;

import software.amazon.awssdk.annotations.SdkProtectedApi;

Expand Down
Loading