Skip to content

Commit a04379b

Browse files
authored
Internal classes and RequestBatchManager Impelementation (#5418)
* Added Internal classes required for BatchManager Implementation * Added Batch Send Implementation * Handled review comments * Handled review comments * Handled review comments * Made RequestBatchManager class a Abstract class * Checkstyle issues * Removed unused methods * New lines removed * Made public static to private state for sqsBatch functions * Constants added * Sonar cloud issues fixed * commit to check why test on codebuild * Increased Timeouts for get * Added abstract methods * Handled comments to remove Builders * Handled comments to take care when batchmanager closed while pending requests * Handled comments * Checkstyle issue
1 parent 72ee0dd commit a04379b

22 files changed

+2095
-6
lines changed

services/sqs/pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,5 +91,26 @@
9191
<version>${awsjavasdk.version}</version>
9292
<scope>test</scope>
9393
</dependency>
94+
<dependency>
95+
<groupId>org.junit.jupiter</groupId>
96+
<artifactId>junit-jupiter</artifactId>
97+
<scope>test</scope>
98+
</dependency>
99+
<dependency>
100+
<groupId>nl.jqno.equalsverifier</groupId>
101+
<artifactId>equalsverifier</artifactId>
102+
<scope>test</scope>
103+
</dependency>
104+
<dependency>
105+
<groupId>org.assertj</groupId>
106+
<artifactId>assertj-core</artifactId>
107+
<scope>test</scope>
108+
</dependency>
109+
<dependency>
110+
<groupId>org.mockito</groupId>
111+
<artifactId>mockito-junit-jupiter</artifactId>
112+
<scope>test</scope>
113+
</dependency>
114+
94115
</dependencies>
95116
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.sqs.internal.batchmanager;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import software.amazon.awssdk.annotations.SdkInternalApi;
20+
21+
@SdkInternalApi
22+
public final class BatchingExecutionContext<RequestT, ResponseT> {
23+
24+
private final RequestT request;
25+
private final CompletableFuture<ResponseT> response;
26+
27+
public BatchingExecutionContext(RequestT request, CompletableFuture<ResponseT> response) {
28+
this.request = request;
29+
this.response = response;
30+
}
31+
32+
public RequestT request() {
33+
return request;
34+
}
35+
36+
public CompletableFuture<ResponseT> response() {
37+
return response;
38+
}
39+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.sqs.internal.batchmanager;
17+
18+
import java.util.Map;
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.ConcurrentHashMap;
21+
import java.util.concurrent.ScheduledFuture;
22+
import java.util.function.BiConsumer;
23+
import java.util.function.Supplier;
24+
import software.amazon.awssdk.annotations.SdkInternalApi;
25+
26+
/**
27+
* Outer map maps a batchKey (ex. queueUrl, overrideConfig etc.) to a {@link RequestBatchBuffer}
28+
*
29+
* @param <RequestT> the type of an outgoing response
30+
*/
31+
@SdkInternalApi
32+
public final class BatchingMap<RequestT, ResponseT> {
33+
34+
private final int maxBatchKeys;
35+
private final int maxBufferSize;
36+
private final Map<String, RequestBatchBuffer<RequestT, ResponseT>> batchContextMap;
37+
38+
public BatchingMap(int maxBatchKeys, int maxBufferSize) {
39+
this.batchContextMap = new ConcurrentHashMap<>();
40+
this.maxBatchKeys = maxBatchKeys;
41+
this.maxBufferSize = maxBufferSize;
42+
}
43+
44+
public void put(String batchKey, Supplier<ScheduledFuture<?>> scheduleFlush, RequestT request,
45+
CompletableFuture<ResponseT> response) throws IllegalStateException {
46+
batchContextMap.computeIfAbsent(batchKey, k -> {
47+
if (batchContextMap.size() == maxBatchKeys) {
48+
throw new IllegalStateException("Reached MaxBatchKeys of: " + maxBatchKeys);
49+
}
50+
return new RequestBatchBuffer<>(maxBufferSize, scheduleFlush.get());
51+
}).put(request, response);
52+
}
53+
54+
public void putScheduledFlush(String batchKey, ScheduledFuture<?> scheduledFlush) {
55+
batchContextMap.get(batchKey).putScheduledFlush(scheduledFlush);
56+
}
57+
58+
public void forEach(BiConsumer<String, RequestBatchBuffer<RequestT, ResponseT>> action) {
59+
batchContextMap.forEach(action);
60+
}
61+
62+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests(String batchKey,
63+
int maxBatchItems) {
64+
return batchContextMap.get(batchKey).flushableRequests(maxBatchItems);
65+
}
66+
67+
public Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableScheduledRequests(String batchKey,
68+
int maxBatchItems) {
69+
return batchContextMap.get(batchKey).flushableScheduledRequests(maxBatchItems);
70+
}
71+
72+
public void cancelScheduledFlush(String batchKey) {
73+
batchContextMap.get(batchKey).cancelScheduledFlush();
74+
}
75+
76+
public void clear() {
77+
for (Map.Entry<String, RequestBatchBuffer<RequestT, ResponseT>> entry : batchContextMap.entrySet()) {
78+
String key = entry.getKey();
79+
entry.getValue().clear();
80+
batchContextMap.remove(key);
81+
}
82+
batchContextMap.clear();
83+
}
84+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.sqs.internal.batchmanager;
17+
18+
import java.util.ArrayList;
19+
import java.util.List;
20+
import java.util.Optional;
21+
import java.util.concurrent.CompletableFuture;
22+
import java.util.concurrent.ScheduledExecutorService;
23+
import java.util.stream.Collectors;
24+
import software.amazon.awssdk.annotations.SdkInternalApi;
25+
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
26+
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
27+
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
28+
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
29+
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
30+
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
31+
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
32+
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
33+
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResultEntry;
34+
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
35+
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse;
36+
import software.amazon.awssdk.services.sqs.model.SqsException;
37+
import software.amazon.awssdk.utils.Either;
38+
39+
@SdkInternalApi
40+
public class ChangeMessageVisibilityBatchManager extends RequestBatchManager<ChangeMessageVisibilityRequest,
41+
ChangeMessageVisibilityResponse,
42+
ChangeMessageVisibilityBatchResponse> {
43+
44+
private final SqsAsyncClient sqsAsyncClient;
45+
46+
protected ChangeMessageVisibilityBatchManager(BatchOverrideConfiguration overrideConfiguration,
47+
ScheduledExecutorService scheduledExecutor,
48+
SqsAsyncClient sqsAsyncClient) {
49+
super(overrideConfiguration, scheduledExecutor);
50+
this.sqsAsyncClient = sqsAsyncClient;
51+
}
52+
53+
private static ChangeMessageVisibilityBatchRequest createChangeMessageVisibilityBatchRequest(
54+
List<IdentifiableMessage<ChangeMessageVisibilityRequest>> identifiedRequests, String batchKey) {
55+
List<ChangeMessageVisibilityBatchRequestEntry> entries = identifiedRequests
56+
.stream()
57+
.map(identifiedRequest -> createChangeMessageVisibilityBatchRequestEntry(identifiedRequest.id(),
58+
identifiedRequest.message()))
59+
.collect(Collectors.toList());
60+
// Since requests are batched together according to a combination of their queueUrl and overrideConfiguration,
61+
// all requests must have the same overrideConfiguration so it is sufficient to retrieve it from the first
62+
// request.
63+
Optional<AwsRequestOverrideConfiguration> overrideConfiguration = identifiedRequests.get(0).message()
64+
.overrideConfiguration();
65+
return overrideConfiguration.map(
66+
config -> ChangeMessageVisibilityBatchRequest.builder()
67+
.queueUrl(batchKey)
68+
.overrideConfiguration(config)
69+
.entries(entries)
70+
.build())
71+
.orElseGet(() -> ChangeMessageVisibilityBatchRequest.builder()
72+
.queueUrl(batchKey)
73+
.entries(entries)
74+
.build());
75+
}
76+
77+
private static ChangeMessageVisibilityBatchRequestEntry createChangeMessageVisibilityBatchRequestEntry(
78+
String id,
79+
ChangeMessageVisibilityRequest request) {
80+
return ChangeMessageVisibilityBatchRequestEntry.builder().id(id).receiptHandle(request.receiptHandle())
81+
.visibilityTimeout(request.visibilityTimeout()).build();
82+
}
83+
84+
private static IdentifiableMessage<ChangeMessageVisibilityResponse> createChangeMessageVisibilityResponse(
85+
ChangeMessageVisibilityBatchResultEntry successfulEntry, ChangeMessageVisibilityBatchResponse batchResponse) {
86+
String key = successfulEntry.id();
87+
ChangeMessageVisibilityResponse.Builder builder = ChangeMessageVisibilityResponse.builder();
88+
if (batchResponse.responseMetadata() != null) {
89+
builder.responseMetadata(batchResponse.responseMetadata());
90+
}
91+
if (batchResponse.sdkHttpResponse() != null) {
92+
builder.sdkHttpResponse(batchResponse.sdkHttpResponse());
93+
}
94+
ChangeMessageVisibilityResponse response = builder.build();
95+
return new IdentifiableMessage<>(key, response);
96+
}
97+
98+
private static IdentifiableMessage<Throwable> changeMessageVisibilityCreateThrowable(BatchResultErrorEntry failedEntry) {
99+
String key = failedEntry.id();
100+
AwsErrorDetails errorDetailsBuilder = AwsErrorDetails.builder().errorCode(failedEntry.code())
101+
.errorMessage(failedEntry.message()).build();
102+
Throwable response = SqsException.builder().awsErrorDetails(errorDetailsBuilder).build();
103+
return new IdentifiableMessage<>(key, response);
104+
}
105+
106+
107+
108+
@Override
109+
protected CompletableFuture<ChangeMessageVisibilityBatchResponse> batchAndSend(
110+
List<IdentifiableMessage<ChangeMessageVisibilityRequest>> identifiedRequests, String batchKey) {
111+
ChangeMessageVisibilityBatchRequest batchRequest = createChangeMessageVisibilityBatchRequest(identifiedRequests,
112+
batchKey);
113+
return sqsAsyncClient.changeMessageVisibilityBatch(batchRequest);
114+
}
115+
116+
@Override
117+
protected String getBatchKey(ChangeMessageVisibilityRequest request) {
118+
return request.overrideConfiguration().map(overrideConfig -> request.queueUrl() + overrideConfig.hashCode())
119+
.orElseGet(request::queueUrl);
120+
}
121+
122+
@Override
123+
protected List<Either<IdentifiableMessage<ChangeMessageVisibilityResponse>,
124+
IdentifiableMessage<Throwable>>> mapBatchResponse(ChangeMessageVisibilityBatchResponse batchResponse) {
125+
126+
List<Either<IdentifiableMessage<ChangeMessageVisibilityResponse>, IdentifiableMessage<Throwable>>> mappedResponses =
127+
new ArrayList<>();
128+
batchResponse.successful().forEach(
129+
batchResponseEntry -> {
130+
IdentifiableMessage<ChangeMessageVisibilityResponse> response = createChangeMessageVisibilityResponse(
131+
batchResponseEntry, batchResponse);
132+
mappedResponses.add(Either.left(response));
133+
});
134+
batchResponse.failed().forEach(batchResponseEntry -> {
135+
IdentifiableMessage<Throwable> response = changeMessageVisibilityCreateThrowable(batchResponseEntry);
136+
mappedResponses.add(Either.right(response));
137+
});
138+
return mappedResponses;
139+
140+
}
141+
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,81 @@
1515

1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

18+
import java.util.concurrent.CompletableFuture;
1819
import java.util.concurrent.ScheduledExecutorService;
1920
import software.amazon.awssdk.annotations.SdkInternalApi;
21+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2022
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2123
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
2224
import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager;
25+
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
26+
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse;
27+
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
28+
import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse;
29+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
30+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
31+
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
32+
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
2333
import software.amazon.awssdk.utils.Validate;
2434

2535
@SdkInternalApi
2636
public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {
2737
// TODO : update the validation here while implementing this class in next PR
2838
private final SqsAsyncClient client;
29-
private final ScheduledExecutorService scheduledExecutor;
30-
private final BatchOverrideConfiguration overrideConfiguration;
39+
40+
private final SendMessageBatchManager sendMessageBatchManager;
41+
42+
private final DeleteMessageBatchManager deleteMessageBatchManager;
43+
44+
private final ChangeMessageVisibilityBatchManager changeMessageVisibilityBatchManager;
3145

3246
private DefaultSqsAsyncBatchManager(DefaultBuilder builder) {
3347
this.client = Validate.notNull(builder.client, "client cannot be null");
34-
this.scheduledExecutor = Validate.notNull(builder.scheduledExecutor, "scheduledExecutor cannot be null");
35-
// TODO : create overrideConfiguration with Default values if null
36-
this.overrideConfiguration = builder.overrideConfiguration;
48+
49+
ScheduledExecutorService scheduledExecutor = builder.scheduledExecutor;
50+
51+
this.sendMessageBatchManager = new SendMessageBatchManager(builder.overrideConfiguration,
52+
scheduledExecutor,
53+
client);
54+
this.deleteMessageBatchManager = new DeleteMessageBatchManager(builder.overrideConfiguration,
55+
scheduledExecutor,
56+
client);
57+
this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(builder.overrideConfiguration,
58+
scheduledExecutor,
59+
client);
60+
//TODO : this will be updated while implementing the Receive Message Batch Manager
61+
}
62+
63+
@SdkTestInternalApi
64+
public DefaultSqsAsyncBatchManager(
65+
SqsAsyncClient client,
66+
SendMessageBatchManager sendMessageBatchManager,
67+
DeleteMessageBatchManager deleteMessageBatchManager,
68+
ChangeMessageVisibilityBatchManager changeMessageVisibilityBatchManager) {
69+
this.sendMessageBatchManager = sendMessageBatchManager;
70+
this.deleteMessageBatchManager = deleteMessageBatchManager;
71+
this.changeMessageVisibilityBatchManager = changeMessageVisibilityBatchManager;
72+
this.client = client;
73+
}
74+
75+
@Override
76+
public CompletableFuture<SendMessageResponse> sendMessage(SendMessageRequest request) {
77+
return sendMessageBatchManager.batchRequest(request);
78+
}
79+
80+
@Override
81+
public CompletableFuture<DeleteMessageResponse> deleteMessage(DeleteMessageRequest request) {
82+
return deleteMessageBatchManager.batchRequest(request);
83+
}
84+
85+
@Override
86+
public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibility(ChangeMessageVisibilityRequest request) {
87+
return changeMessageVisibilityBatchManager.batchRequest(request);
88+
}
89+
90+
@Override
91+
public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRequest request) {
92+
return null;
3793
}
3894

3995
public static SqsAsyncBatchManager.Builder builder() {
@@ -42,6 +98,9 @@ public static SqsAsyncBatchManager.Builder builder() {
4298

4399
@Override
44100
public void close() {
101+
sendMessageBatchManager.close();
102+
deleteMessageBatchManager.close();
103+
changeMessageVisibilityBatchManager.close();
45104
}
46105

47106
public static final class DefaultBuilder implements SqsAsyncBatchManager.Builder {

0 commit comments

Comments
 (0)