Skip to content

Commit 318969b

Browse files
committed
Added Internal classes required for BatchManager Implementation
1 parent 0ea7f53 commit 318969b

22 files changed

+2052
-0
lines changed

services/sqs/pom.xml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,5 +91,21 @@
9191
<version>${awsjavasdk.version}</version>
9292
<scope>test</scope>
9393
</dependency>
94+
<dependency>
95+
<groupId>org.junit.jupiter</groupId>
96+
<artifactId>junit-jupiter</artifactId>
97+
<scope>test</scope>
98+
</dependency>
99+
<dependency>
100+
<groupId>nl.jqno.equalsverifier</groupId>
101+
<artifactId>equalsverifier</artifactId>
102+
<scope>test</scope>
103+
</dependency>
104+
<dependency>
105+
<groupId>org.assertj</groupId>
106+
<artifactId>assertj-core</artifactId>
107+
<scope>test</scope>
108+
</dependency>
109+
94110
</dependencies>
95111
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.sqs.internal.batchmanager;
17+
18+
import java.time.Duration;
19+
import java.util.Map;
20+
import java.util.concurrent.ScheduledExecutorService;
21+
import software.amazon.awssdk.annotations.SdkInternalApi;
22+
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
23+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchAndSend;
24+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchConfiguration;
25+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchKeyMapper;
26+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchManager;
27+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchManagerType;
28+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchResponseMapper;
29+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchingExecutionContext;
30+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchingMap;
31+
import software.amazon.awssdk.utils.Validate;
32+
33+
@SdkInternalApi
34+
public abstract class AbstractBatchManager<RequestT, ResponseT, BatchResponseT> implements BatchManager<RequestT, ResponseT,
35+
BatchResponseT> {
36+
private final int maxBatchItems;
37+
private final Duration maxBatchOpenInMs;
38+
39+
/**
40+
* A nested map that keeps track of customer requests and the corresponding responses to be completed. Requests are batched
41+
* together according to a batchKey that is calculated from the request by the service client.
42+
*/
43+
private final BatchingMap<RequestT, ResponseT> requestsAndResponsesMaps;
44+
45+
/**
46+
* Takes a list of identified requests in addition to a destination and batches the requests into a batch request. It then
47+
* sends the batch request and returns a CompletableFuture of the response.
48+
*/
49+
private final BatchAndSend<RequestT, BatchResponseT> batchFunction;
50+
51+
/**
52+
* Unpacks the batch response, then transforms individual entries to the appropriate response type. Each entry's batch ID is
53+
* mapped to the individual response entry.
54+
*/
55+
private final BatchResponseMapper<BatchResponseT, ResponseT> responseMapper;
56+
57+
/**
58+
* Takes a request and extracts a batchKey as determined by the caller.
59+
*/
60+
private final BatchKeyMapper<RequestT> batchKeyMapper;
61+
62+
/**
63+
* A scheduled executor that periodically schedules {@link #flushBuffer}.
64+
*/
65+
private final ScheduledExecutorService scheduledExecutor;
66+
67+
protected AbstractBatchManager(DefaultBuilder<RequestT, ResponseT, BatchResponseT> builder) {
68+
BatchConfiguration batchConfiguration = new BatchConfiguration(builder.overrideConfiguration);
69+
this.requestsAndResponsesMaps = new BatchingMap<>(batchConfiguration.maxBatchKeys(),
70+
batchConfiguration.maxBufferSize());
71+
this.maxBatchItems = batchConfiguration.maxBatchItems();
72+
this.maxBatchOpenInMs = batchConfiguration.maxBatchOpenInMs();
73+
this.batchFunction = Validate.notNull(builder.batchFunction, "Null batchFunction");
74+
this.responseMapper = Validate.notNull(builder.responseMapper, "Null responseMapper");
75+
this.batchKeyMapper = Validate.notNull(builder.batchKeyMapper, "Null batchKeyMapper");
76+
this.scheduledExecutor = Validate.notNull(builder.scheduledExecutor, "Null scheduledExecutor");
77+
}
78+
79+
public static <RequestT, ResponseT, BatchResponseT> Builder<RequestT, ResponseT, BatchResponseT> builder() {
80+
return new DefaultBuilder<>();
81+
}
82+
83+
84+
@Override
85+
public void close() {
86+
requestsAndResponsesMaps.forEach((batchKey, batchBuffer) -> {
87+
requestsAndResponsesMaps.cancelScheduledFlush(batchKey);
88+
Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests =
89+
requestsAndResponsesMaps.flushableRequests(batchKey, maxBatchItems);
90+
91+
while (!flushableRequests.isEmpty()) {
92+
flushBuffer(batchKey, flushableRequests);
93+
}
94+
});
95+
requestsAndResponsesMaps.clear();
96+
}
97+
98+
private void flushBuffer(String batchKey, Map<String, BatchingExecutionContext<RequestT, ResponseT>> flushableRequests) {
99+
}
100+
101+
public static final class DefaultBuilder<RequestT, ResponseT, BatchResponseT> implements Builder<RequestT, ResponseT,
102+
BatchResponseT> {
103+
104+
private BatchOverrideConfiguration overrideConfiguration;
105+
private ScheduledExecutorService scheduledExecutor;
106+
private BatchAndSend<RequestT, BatchResponseT> batchFunction;
107+
private BatchResponseMapper<BatchResponseT, ResponseT> responseMapper;
108+
private BatchKeyMapper<RequestT> batchKeyMapper;
109+
private BatchManagerType batchManagerType;
110+
111+
private DefaultBuilder() {
112+
}
113+
114+
@Override
115+
public Builder<RequestT, ResponseT, BatchResponseT> overrideConfiguration(BatchOverrideConfiguration
116+
overrideConfiguration) {
117+
this.overrideConfiguration = overrideConfiguration;
118+
return this;
119+
}
120+
121+
@Override
122+
public Builder<RequestT, ResponseT, BatchResponseT> scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
123+
this.scheduledExecutor = scheduledExecutor;
124+
return this;
125+
}
126+
127+
@Override
128+
public Builder<RequestT, ResponseT, BatchResponseT> batchFunction(BatchAndSend<RequestT, BatchResponseT>
129+
batchingFunction) {
130+
this.batchFunction = batchingFunction;
131+
return this;
132+
}
133+
134+
@Override
135+
public Builder<RequestT, ResponseT, BatchResponseT> responseMapper(
136+
BatchResponseMapper<BatchResponseT, ResponseT> responseMapper) {
137+
this.responseMapper = responseMapper;
138+
return this;
139+
}
140+
141+
@Override
142+
public Builder<RequestT, ResponseT, BatchResponseT> batchKeyMapper(BatchKeyMapper<RequestT> batchKeyMapper) {
143+
this.batchKeyMapper = batchKeyMapper;
144+
return this;
145+
}
146+
147+
@Override
148+
public Builder<RequestT, ResponseT, BatchResponseT> batchManagerType(BatchManagerType batchManagerType) {
149+
this.batchManagerType = batchManagerType;
150+
return this;
151+
}
152+
153+
public BatchManager<RequestT, ResponseT, BatchResponseT> build() {
154+
if (batchManagerType == BatchManagerType.RESPONSE) {
155+
return new ResponsesBatchManager<>(this);
156+
} else if (batchManagerType == BatchManagerType.REQUEST) {
157+
return new RequestsBatchManager<>(this);
158+
} else {
159+
throw new IllegalArgumentException("Type must be specified as either RESPONSE or REQUEST");
160+
}
161+
}
162+
163+
}
164+
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/DefaultSqsAsyncBatchManager.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,24 @@
1515

1616
package software.amazon.awssdk.services.sqs.internal.batchmanager;
1717

18+
import java.util.concurrent.CompletableFuture;
1819
import java.util.concurrent.ScheduledExecutorService;
1920
import software.amazon.awssdk.annotations.SdkInternalApi;
2021
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2122
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
2223
import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager;
24+
import software.amazon.awssdk.services.sqs.internal.batchmanager.core.BatchManager;
25+
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchResponse;
26+
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityRequest;
27+
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityResponse;
28+
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
29+
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
30+
import software.amazon.awssdk.services.sqs.model.DeleteMessageResponse;
31+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
32+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
33+
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
34+
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
35+
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
2336
import software.amazon.awssdk.utils.Validate;
2437

2538
@SdkInternalApi
@@ -29,11 +42,45 @@ public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {
2942
private final ScheduledExecutorService scheduledExecutor;
3043
private final BatchOverrideConfiguration overrideConfiguration;
3144

45+
private final BatchManager<SendMessageRequest, SendMessageResponse, SendMessageBatchResponse> sendMessageBatchManager;
46+
47+
private final BatchManager<DeleteMessageRequest, DeleteMessageResponse, DeleteMessageBatchResponse> deleteMessageBatchManager;
48+
49+
private final BatchManager<ChangeMessageVisibilityRequest, ChangeMessageVisibilityResponse,
50+
ChangeMessageVisibilityBatchResponse> changeMessageVisibilityBatchManager;
51+
private final BatchManager<ReceiveMessageRequest, ReceiveMessageResponse,
52+
ReceiveMessageResponse> receiveMessageBatchManager;
53+
3254
private DefaultSqsAsyncBatchManager(DefaultBuilder builder) {
3355
this.client = Validate.notNull(builder.client, "client cannot be null");
3456
this.scheduledExecutor = Validate.notNull(builder.scheduledExecutor, "scheduledExecutor cannot be null");
3557
// TODO : create overrideConfiguration with Default values if null
3658
this.overrideConfiguration = builder.overrideConfiguration;
59+
60+
sendMessageBatchManager = null;
61+
deleteMessageBatchManager = null;
62+
changeMessageVisibilityBatchManager = null;
63+
receiveMessageBatchManager = null;
64+
}
65+
66+
@Override
67+
public CompletableFuture<SendMessageResponse> sendMessage(SendMessageRequest request) {
68+
return SqsAsyncBatchManager.super.sendMessage(request);
69+
}
70+
71+
@Override
72+
public CompletableFuture<DeleteMessageResponse> deleteMessage(DeleteMessageRequest request) {
73+
return SqsAsyncBatchManager.super.deleteMessage(request);
74+
}
75+
76+
@Override
77+
public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibility(ChangeMessageVisibilityRequest request) {
78+
return SqsAsyncBatchManager.super.changeMessageVisibility(request);
79+
}
80+
81+
@Override
82+
public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRequest request) {
83+
return SqsAsyncBatchManager.super.receiveMessage(request);
3784
}
3885

3986
public static SqsAsyncBatchManager.Builder builder() {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.sqs.internal.batchmanager;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import software.amazon.awssdk.annotations.SdkInternalApi;
20+
21+
@SdkInternalApi
22+
public class RequestsBatchManager<RequestT, ResponseT, BatchResponseT> extends
23+
AbstractBatchManager<RequestT, ResponseT, BatchResponseT> {
24+
25+
protected RequestsBatchManager(DefaultBuilder<RequestT, ResponseT, BatchResponseT> builder) {
26+
super(builder);
27+
}
28+
29+
@Override
30+
public CompletableFuture<ResponseT> sendRequest(RequestT request) {
31+
// Add implementation logic for sending a request
32+
return null;
33+
}
34+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.services.sqs.internal.batchmanager;
17+
18+
import java.util.concurrent.CompletableFuture;
19+
import software.amazon.awssdk.annotations.SdkInternalApi;
20+
21+
@SdkInternalApi
22+
public class ResponsesBatchManager<RequestT, ResponseT, BatchResponseT>
23+
extends AbstractBatchManager<RequestT, ResponseT, BatchResponseT> {
24+
25+
protected ResponsesBatchManager(DefaultBuilder<RequestT, ResponseT, BatchResponseT> builder) {
26+
super(builder);
27+
}
28+
29+
@Override
30+
public CompletableFuture<ResponseT> sendRequest(RequestT request) {
31+
// Add implementation logic for sending a request
32+
return null;
33+
}
34+
}

0 commit comments

Comments
 (0)