Skip to content

Commit 1a3b8e1

Browse files
committed
Added Batch Send Implementation
1 parent 615d6c0 commit 1a3b8e1

22 files changed

+957
-289
lines changed

services/sqs/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@
106106
<artifactId>assertj-core</artifactId>
107107
<scope>test</scope>
108108
</dependency>
109+
<dependency>
110+
<groupId>org.mockito</groupId>
111+
<artifactId>mockito-junit-jupiter</artifactId>
112+
<scope>test</scope>
113+
</dependency>
109114

110115
</dependencies>
111116
</project>

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java

Lines changed: 74 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,11 @@
1515

1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

18+
import java.time.Duration;
1819
import java.util.concurrent.CompletableFuture;
1920
import java.util.concurrent.ScheduledExecutorService;
2021
import software.amazon.awssdk.annotations.SdkInternalApi;
22+
import software.amazon.awssdk.annotations.SdkTestInternalApi;
2123
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2224
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
2325
import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager;
@@ -39,8 +41,6 @@
3941
public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {
4042
// TODO : update the validation here while implementing this class in next PR
4143
private final SqsAsyncClient client;
42-
private final ScheduledExecutorService scheduledExecutor;
43-
private final BatchOverrideConfiguration overrideConfiguration;
4444

4545
private final BatchManager<SendMessageRequest, SendMessageResponse, SendMessageBatchResponse> sendMessageBatchManager;
4646

@@ -53,34 +53,69 @@ public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {
5353

5454
private DefaultSqsAsyncBatchManager(DefaultBuilder builder) {
5555
this.client = Validate.notNull(builder.client, "client cannot be null");
56-
this.scheduledExecutor = Validate.notNull(builder.scheduledExecutor, "scheduledExecutor cannot be null");
57-
// TODO : create overrideConfiguration with Default values if null
58-
this.overrideConfiguration = builder.overrideConfiguration;
5956

60-
sendMessageBatchManager = null;
61-
deleteMessageBatchManager = null;
62-
changeMessageVisibilityBatchManager = null;
57+
ScheduledExecutorService scheduledExecutor = builder.scheduledExecutor;
58+
59+
this.sendMessageBatchManager = BatchManager
60+
.requestBatchManagerBuilder(SendMessageRequest.class, SendMessageResponse.class, SendMessageBatchResponse.class)
61+
.batchFunction(SqsBatchFunctions.sendMessageBatchAsyncFunction(client))
62+
.responseMapper(SqsBatchFunctions.sendMessageResponseMapper())
63+
.batchKeyMapper(SqsBatchFunctions.sendMessageBatchKeyMapper())
64+
.overrideConfiguration(sendMessageConfig(builder.overrideConfiguration)).scheduledExecutor(scheduledExecutor)
65+
.build();
66+
this.deleteMessageBatchManager = BatchManager
67+
.requestBatchManagerBuilder(DeleteMessageRequest.class, DeleteMessageResponse.class, DeleteMessageBatchResponse.class)
68+
.batchFunction(SqsBatchFunctions.deleteMessageBatchAsyncFunction(client))
69+
.responseMapper(SqsBatchFunctions.deleteMessageResponseMapper())
70+
.batchKeyMapper(SqsBatchFunctions.deleteMessageBatchKeyMapper())
71+
.overrideConfiguration(deleteMessageConfig(builder.overrideConfiguration)).scheduledExecutor(scheduledExecutor)
72+
.build();
73+
this.changeMessageVisibilityBatchManager = BatchManager
74+
.requestBatchManagerBuilder(ChangeMessageVisibilityRequest.class, ChangeMessageVisibilityResponse.class,
75+
ChangeMessageVisibilityBatchResponse.class)
76+
.batchFunction(SqsBatchFunctions.changeMessageVisibilityBatchAsyncFunction(client))
77+
.responseMapper(SqsBatchFunctions.changeMessageVisibilityResponseMapper())
78+
.batchKeyMapper(SqsBatchFunctions.changeMessageVisibilityBatchKeyMapper())
79+
.overrideConfiguration(changeMessageVisibilityConfig(builder.overrideConfiguration))
80+
.scheduledExecutor(scheduledExecutor).build();
81+
82+
//TODO : this will be updated while implementing the Receive Message Batch Manager
6383
receiveMessageBatchManager = null;
6484
}
6585

86+
87+
@SdkTestInternalApi
88+
public DefaultSqsAsyncBatchManager(
89+
SqsAsyncClient client,
90+
BatchManager<SendMessageRequest, SendMessageResponse, SendMessageBatchResponse> sendMessageBatchManager,
91+
BatchManager<DeleteMessageRequest, DeleteMessageResponse, DeleteMessageBatchResponse> deleteMessageBatchManager,
92+
BatchManager<ChangeMessageVisibilityRequest, ChangeMessageVisibilityResponse,
93+
ChangeMessageVisibilityBatchResponse> changeMessageVisibilityBatchManager) {
94+
this.sendMessageBatchManager = sendMessageBatchManager;
95+
this.deleteMessageBatchManager = deleteMessageBatchManager;
96+
this.changeMessageVisibilityBatchManager = changeMessageVisibilityBatchManager;
97+
receiveMessageBatchManager = null;
98+
this.client = client;
99+
}
100+
66101
@Override
67102
public CompletableFuture<SendMessageResponse> sendMessage(SendMessageRequest request) {
68-
return SqsAsyncBatchManager.super.sendMessage(request);
103+
return sendMessageBatchManager.batchRequest(request);
69104
}
70105

71106
@Override
72107
public CompletableFuture<DeleteMessageResponse> deleteMessage(DeleteMessageRequest request) {
73-
return SqsAsyncBatchManager.super.deleteMessage(request);
108+
return deleteMessageBatchManager.batchRequest(request);
74109
}
75110

76111
@Override
77112
public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibility(ChangeMessageVisibilityRequest request) {
78-
return SqsAsyncBatchManager.super.changeMessageVisibility(request);
113+
return changeMessageVisibilityBatchManager.batchRequest(request);
79114
}
80115

81116
@Override
82117
public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRequest request) {
83-
return SqsAsyncBatchManager.super.receiveMessage(request);
118+
return receiveMessageBatchManager.batchRequest(request);
84119
}
85120

86121
public static SqsAsyncBatchManager.Builder builder() {
@@ -89,6 +124,33 @@ public static SqsAsyncBatchManager.Builder builder() {
89124

90125
@Override
91126
public void close() {
127+
sendMessageBatchManager.close();
128+
deleteMessageBatchManager.close();
129+
changeMessageVisibilityBatchManager.close();
130+
}
131+
132+
private BatchOverrideConfiguration createConfig(BatchOverrideConfiguration overrideConfiguration) {
133+
BatchOverrideConfiguration.Builder config = BatchOverrideConfiguration.builder();
134+
if (overrideConfiguration == null) {
135+
config.maxBatchItems(10);
136+
config.maxBatchOpenInMs(Duration.ofMillis(200));
137+
} else {
138+
config.maxBatchItems(overrideConfiguration.maxBatchItems().orElse(10));
139+
config.maxBatchOpenInMs(overrideConfiguration.maxBatchOpenInMs().orElse(Duration.ofMillis(200)));
140+
}
141+
return config.build();
142+
}
143+
144+
private BatchOverrideConfiguration sendMessageConfig(BatchOverrideConfiguration overrideConfiguration) {
145+
return createConfig(overrideConfiguration);
146+
}
147+
148+
private BatchOverrideConfiguration deleteMessageConfig(BatchOverrideConfiguration overrideConfiguration) {
149+
return createConfig(overrideConfiguration);
150+
}
151+
152+
private BatchOverrideConfiguration changeMessageVisibilityConfig(BatchOverrideConfiguration overrideConfiguration) {
153+
return createConfig(overrideConfiguration);
92154
}
93155

94156
public static final class DefaultBuilder implements SqsAsyncBatchManager.Builder {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestsBatchManager.java

Lines changed: 0 additions & 34 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.sqs.internal.batchmanager;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.ScheduledExecutorService;
20+
import software.amazon.awssdk.annotations.SdkInternalApi;
21+
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
22+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchAndSend;
23+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchKeyMapper;
24+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchManager;
25+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchResponseMapper;
26+
27+
28+
// TODO: This class will be implemented in future PR , for now just added it to show how we have separate Batch managers for
29+
// Request And Response
30+
@SdkInternalApi
31+
public final class ResponseBatchManager<RequestT, ResponseT, BatchResponseT>
32+
implements BatchManager<RequestT, ResponseT, BatchResponseT> {
33+
34+
35+
private ResponseBatchManager(DefaultBuilder<RequestT, ResponseT, BatchResponseT> builder) {
36+
// TODO: Add implementation logic for sending a request in separate PR
37+
}
38+
39+
public static <ResponseT, BatchResponseT, RequestT> Builder<RequestT, ResponseT, BatchResponseT> builder() {
40+
return new DefaultBuilder<>();
41+
42+
}
43+
44+
45+
@Override
46+
public CompletableFuture<ResponseT> batchRequest(RequestT request) {
47+
// TODO: Add implementation logic for sending a request in separate PR
48+
return null;
49+
}
50+
51+
@Override
52+
public void close() {
53+
// TODO: Add implementation logic for sending a request in separate PR
54+
}
55+
56+
public static final class DefaultBuilder<RequestT, ResponseT, BatchResponseT> implements Builder<RequestT, ResponseT,
57+
BatchResponseT> {
58+
59+
public ResponseBatchManager<RequestT, ResponseT,
60+
BatchResponseT> build() {
61+
return new ResponseBatchManager<>(this);
62+
}
63+
64+
@Override
65+
public Builder<RequestT, ResponseT,
66+
BatchResponseT> overrideConfiguration(BatchOverrideConfiguration overrideConfiguration) {
67+
return this;
68+
}
69+
70+
@Override
71+
public Builder<RequestT, ResponseT, BatchResponseT> scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
72+
return this;
73+
}
74+
75+
@Override
76+
public Builder<RequestT, ResponseT,
77+
BatchResponseT> batchFunction(BatchAndSend<RequestT, BatchResponseT> batchFunction) {
78+
return this;
79+
}
80+
81+
@Override
82+
public Builder<RequestT, ResponseT,
83+
BatchResponseT> responseMapper(BatchResponseMapper<BatchResponseT, ResponseT> responseMapper) {
84+
return this;
85+
}
86+
87+
@Override
88+
public Builder<RequestT, ResponseT, BatchResponseT> batchKeyMapper(BatchKeyMapper<RequestT> batchKeyMapper) {
89+
return this;
90+
}
91+
}
92+
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponsesBatchManager.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsBatchFunctions.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.concurrent.CompletableFuture;
2222
import java.util.concurrent.Executor;
2323
import java.util.stream.Collectors;
24-
import software.amazon.awssdk.annotations.Generated;
2524
import software.amazon.awssdk.annotations.SdkInternalApi;
2625
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
2726
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
@@ -53,7 +52,6 @@
5352
import software.amazon.awssdk.services.sqs.model.SqsException;
5453
import software.amazon.awssdk.utils.Either;
5554

56-
@Generated("software.amazon.awssdk:codegen")
5755
@SdkInternalApi
5856
public final class SqsBatchFunctions {
5957
private SqsBatchFunctions() {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/core/BatchAndSend.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.util.List;
1919
import java.util.concurrent.CompletableFuture;
2020
import software.amazon.awssdk.annotations.SdkInternalApi;
21-
import software.amazon.awssdk.annotations.SdkProtectedApi;
2221

2322
/**
2423
* Takes a list of identified requests in addition to a batchKey and batches the requests into a batch request.

0 commit comments

Comments
 (0)