Skip to content

Commit 2ae8af6

Browse files
committed
Added Batch Send Implementation
1 parent 615d6c0 commit 2ae8af6

22 files changed

+968
-289
lines changed

services/sqs/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,11 @@
106106
<artifactId>assertj-core</artifactId>
107107
<scope>test</scope>
108108
</dependency>
109+
<dependency>
110+
<groupId>org.mockito</groupId>
111+
<artifactId>mockito-junit-jupiter</artifactId>
112+
<scope>test</scope>
113+
</dependency>
109114

110115
</dependencies>
111116
</project>

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java

Lines changed: 84 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

18+
import java.time.Duration;
1819
import java.util.concurrent.CompletableFuture;
1920
import java.util.concurrent.ScheduledExecutorService;
2021
import software.amazon.awssdk.annotations.SdkInternalApi;
@@ -39,8 +40,6 @@
3940
public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {
4041
// TODO : update the validation here while implementing this class in next PR
4142
private final SqsAsyncClient client;
42-
private final ScheduledExecutorService scheduledExecutor;
43-
private final BatchOverrideConfiguration overrideConfiguration;
4443

4544
private final BatchManager<SendMessageRequest, SendMessageResponse, SendMessageBatchResponse> sendMessageBatchManager;
4645

@@ -53,34 +52,68 @@ public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {
5352

5453
private DefaultSqsAsyncBatchManager(DefaultBuilder builder) {
5554
this.client = Validate.notNull(builder.client, "client cannot be null");
56-
this.scheduledExecutor = Validate.notNull(builder.scheduledExecutor, "scheduledExecutor cannot be null");
57-
// TODO : create overrideConfiguration with Default values if null
58-
this.overrideConfiguration = builder.overrideConfiguration;
5955

60-
sendMessageBatchManager = null;
61-
deleteMessageBatchManager = null;
62-
changeMessageVisibilityBatchManager = null;
56+
ScheduledExecutorService scheduledExecutor = builder.scheduledExecutor;
57+
58+
this.sendMessageBatchManager = BatchManager
59+
.requestBatchManagerBuilder(SendMessageRequest.class, SendMessageResponse.class, SendMessageBatchResponse.class)
60+
.batchFunction(SqsBatchFunctions.sendMessageBatchAsyncFunction(client))
61+
.responseMapper(SqsBatchFunctions.sendMessageResponseMapper())
62+
.batchKeyMapper(SqsBatchFunctions.sendMessageBatchKeyMapper())
63+
.overrideConfiguration(sendMessageConfig(builder.overrideConfiguration)).scheduledExecutor(scheduledExecutor)
64+
.build();
65+
this.deleteMessageBatchManager = BatchManager
66+
.requestBatchManagerBuilder(DeleteMessageRequest.class, DeleteMessageResponse.class, DeleteMessageBatchResponse.class)
67+
.batchFunction(SqsBatchFunctions.deleteMessageBatchAsyncFunction(client))
68+
.responseMapper(SqsBatchFunctions.deleteMessageResponseMapper())
69+
.batchKeyMapper(SqsBatchFunctions.deleteMessageBatchKeyMapper())
70+
.overrideConfiguration(deleteMessageConfig(builder.overrideConfiguration)).scheduledExecutor(scheduledExecutor)
71+
.build();
72+
this.changeMessageVisibilityBatchManager = BatchManager
73+
.requestBatchManagerBuilder(ChangeMessageVisibilityRequest.class, ChangeMessageVisibilityResponse.class,
74+
ChangeMessageVisibilityBatchResponse.class)
75+
.batchFunction(SqsBatchFunctions.changeMessageVisibilityBatchAsyncFunction(client))
76+
.responseMapper(SqsBatchFunctions.changeMessageVisibilityResponseMapper())
77+
.batchKeyMapper(SqsBatchFunctions.changeMessageVisibilityBatchKeyMapper())
78+
.overrideConfiguration(changeMessageVisibilityConfig(builder.overrideConfiguration))
79+
.scheduledExecutor(scheduledExecutor).build();
6380
receiveMessageBatchManager = null;
6481
}
6582

83+
84+
@SdkInternalApi
85+
public DefaultSqsAsyncBatchManager(
86+
SqsAsyncClient client,
87+
BatchManager<SendMessageRequest, SendMessageResponse, SendMessageBatchResponse> sendMessageBatchManager,
88+
BatchManager<DeleteMessageRequest, DeleteMessageResponse, DeleteMessageBatchResponse> deleteMessageBatchManager,
89+
BatchManager<ChangeMessageVisibilityRequest, ChangeMessageVisibilityResponse,
90+
ChangeMessageVisibilityBatchResponse> changeMessageVisibilityBatchManager) {
91+
this.sendMessageBatchManager = sendMessageBatchManager;
92+
this.deleteMessageBatchManager = deleteMessageBatchManager;
93+
this.changeMessageVisibilityBatchManager = changeMessageVisibilityBatchManager;
94+
receiveMessageBatchManager = null;
95+
96+
this.client = client;
97+
}
98+
6699
@Override
67100
public CompletableFuture<SendMessageResponse> sendMessage(SendMessageRequest request) {
68-
return SqsAsyncBatchManager.super.sendMessage(request);
101+
return sendMessageBatchManager.batchRequest(request);
69102
}
70103

71104
@Override
72105
public CompletableFuture<DeleteMessageResponse> deleteMessage(DeleteMessageRequest request) {
73-
return SqsAsyncBatchManager.super.deleteMessage(request);
106+
return deleteMessageBatchManager.batchRequest(request);
74107
}
75108

76109
@Override
77110
public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibility(ChangeMessageVisibilityRequest request) {
78-
return SqsAsyncBatchManager.super.changeMessageVisibility(request);
111+
return changeMessageVisibilityBatchManager.batchRequest(request);
79112
}
80113

81114
@Override
82115
public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRequest request) {
83-
return SqsAsyncBatchManager.super.receiveMessage(request);
116+
return receiveMessageBatchManager.batchRequest(request);
84117
}
85118

86119
public static SqsAsyncBatchManager.Builder builder() {
@@ -89,6 +122,45 @@ public static SqsAsyncBatchManager.Builder builder() {
89122

90123
@Override
91124
public void close() {
125+
sendMessageBatchManager.close();
126+
deleteMessageBatchManager.close();
127+
changeMessageVisibilityBatchManager.close();
128+
}
129+
130+
private BatchOverrideConfiguration sendMessageConfig(BatchOverrideConfiguration overrideConfiguration) {
131+
BatchOverrideConfiguration.Builder config = BatchOverrideConfiguration.builder();
132+
if (overrideConfiguration == null) {
133+
config.maxBatchItems(10);
134+
config.maxBatchOpenInMs(Duration.ofMillis(200));
135+
} else {
136+
config.maxBatchItems(overrideConfiguration.maxBatchItems().orElse(10));
137+
config.maxBatchOpenInMs(overrideConfiguration.maxBatchOpenInMs().orElse(Duration.ofMillis(200)));
138+
}
139+
return config.build();
140+
}
141+
142+
private BatchOverrideConfiguration deleteMessageConfig(BatchOverrideConfiguration overrideConfiguration) {
143+
BatchOverrideConfiguration.Builder config = BatchOverrideConfiguration.builder();
144+
if (overrideConfiguration == null) {
145+
config.maxBatchItems(10);
146+
config.maxBatchOpenInMs(Duration.ofMillis(200));
147+
} else {
148+
config.maxBatchItems(overrideConfiguration.maxBatchItems().orElse(10));
149+
config.maxBatchOpenInMs(overrideConfiguration.maxBatchOpenInMs().orElse(Duration.ofMillis(200)));
150+
}
151+
return config.build();
152+
}
153+
154+
private BatchOverrideConfiguration changeMessageVisibilityConfig(BatchOverrideConfiguration overrideConfiguration) {
155+
BatchOverrideConfiguration.Builder config = BatchOverrideConfiguration.builder();
156+
if (overrideConfiguration == null) {
157+
config.maxBatchItems(10);
158+
config.maxBatchOpenInMs(Duration.ofMillis(200));
159+
} else {
160+
config.maxBatchItems(overrideConfiguration.maxBatchItems().orElse(10));
161+
config.maxBatchOpenInMs(overrideConfiguration.maxBatchOpenInMs().orElse(Duration.ofMillis(200)));
162+
}
163+
return config.build();
92164
}
93165

94166
public static final class DefaultBuilder implements SqsAsyncBatchManager.Builder {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestsBatchManager.java

Lines changed: 0 additions & 34 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.sqs.internal.batchmanager;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.ScheduledExecutorService;
20+
import software.amazon.awssdk.annotations.SdkInternalApi;
21+
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
22+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchAndSend;
23+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchKeyMapper;
24+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchManager;
25+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchResponseMapper;
26+
27+
28+
// TODO: This class will be implemented in future PR , for now just added it to show how we have separate Batch managers for
29+
// Request And Response
30+
@SdkInternalApi
31+
public final class ResponseBatchManager<RequestT, ResponseT, BatchResponseT>
32+
implements BatchManager<RequestT, ResponseT, BatchResponseT> {
33+
34+
35+
private ResponseBatchManager(DefaultBuilder<RequestT, ResponseT, BatchResponseT> builder) {
36+
// TODO: Add implementation logic for sending a request in separate PR
37+
}
38+
39+
public static <ResponseT, BatchResponseT, RequestT> Builder<RequestT, ResponseT, BatchResponseT> builder() {
40+
return new DefaultBuilder<>();
41+
42+
}
43+
44+
45+
@Override
46+
public CompletableFuture<ResponseT> batchRequest(RequestT request) {
47+
// TODO: Add implementation logic for sending a request in separate PR
48+
return null;
49+
}
50+
51+
@Override
52+
public void close() {
53+
// TODO: Add implementation logic for sending a request in separate PR
54+
}
55+
56+
public static final class DefaultBuilder<RequestT, ResponseT, BatchResponseT> implements Builder<RequestT, ResponseT,
57+
BatchResponseT> {
58+
59+
public ResponseBatchManager<RequestT, ResponseT,
60+
BatchResponseT> build() {
61+
return new ResponseBatchManager<>(this);
62+
}
63+
64+
@Override
65+
public Builder<RequestT, ResponseT,
66+
BatchResponseT> overrideConfiguration(BatchOverrideConfiguration overrideConfiguration) {
67+
return this;
68+
}
69+
70+
@Override
71+
public Builder<RequestT, ResponseT, BatchResponseT> scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
72+
return this;
73+
}
74+
75+
@Override
76+
public Builder<RequestT, ResponseT,
77+
BatchResponseT> batchFunction(BatchAndSend<RequestT, BatchResponseT> batchFunction) {
78+
return this;
79+
}
80+
81+
@Override
82+
public Builder<RequestT, ResponseT,
83+
BatchResponseT> responseMapper(BatchResponseMapper<BatchResponseT, ResponseT> responseMapper) {
84+
return this;
85+
}
86+
87+
@Override
88+
public Builder<RequestT, ResponseT, BatchResponseT> batchKeyMapper(BatchKeyMapper<RequestT> batchKeyMapper) {
89+
return this;
90+
}
91+
}
92+
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ResponsesBatchManager.java

Lines changed: 0 additions & 34 deletions
This file was deleted.

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/SqsBatchFunctions.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.concurrent.CompletableFuture;
2222
import java.util.concurrent.Executor;
2323
import java.util.stream.Collectors;
24-
import software.amazon.awssdk.annotations.Generated;
2524
import software.amazon.awssdk.annotations.SdkInternalApi;
2625
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
2726
import software.amazon.awssdk.awscore.exception.AwsErrorDetails;
@@ -53,7 +52,6 @@
5352
import software.amazon.awssdk.services.sqs.model.SqsException;
5453
import software.amazon.awssdk.utils.Either;
5554

56-
@Generated("software.amazon.awssdk:codegen")
5755
@SdkInternalApi
5856
public final class SqsBatchFunctions {
5957
private SqsBatchFunctions() {

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/core/BatchAndSend.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import java.util.List;
1919
import java.util.concurrent.CompletableFuture;
2020
import software.amazon.awssdk.annotations.SdkInternalApi;
21-
import software.amazon.awssdk.annotations.SdkProtectedApi;
2221

2322
/**
2423
* Takes a list of identified requests in addition to a batchKey and batches the requests into a batch request.

0 commit comments

Comments
 (0)