Skip to content

Commit 40bdde8

Browse files
joviegasdavidh44
andauthored
SQS Automatic Request Batching (#5580)
* Codegenerate BatchManager API under AsyncClient and Initail Interfaces for BatchManager (#5321) * Codegenerate BatchManager API under AsyncClient and Add empty initial Batchmanager interfaces and Implementations * Addressed review comments * Added Internal classes required for BatchManager Implementation * Revert "Added Internal classes required for BatchManager Implementation" This reverts commit 318969b. * Internal classes and RequestBatchManager Impelementation (#5418) * Added Internal classes required for BatchManager Implementation * Added Batch Send Implementation * Handled review comments * Handled review comments * Handled review comments * Made RequestBatchManager class a Abstract class * Checkstyle issues * Removed unused methods * New lines removed * Made public static to private state for sqsBatch functions * Constants added * Sonar cloud issues fixed * commit to check why test on codebuild * Increased Timeouts for get * Added abstract methods * Handled comments to remove Builders * Handled comments to take care when batchmanager closed while pending requests * Handled comments * Checkstyle issue * Added Consumer builders args for existing APIs of BatchManager (#5514) * Receive Batch Manager Implementation (#5488) * Add Recieve Buffer Queue And its related configuration * Update ReceiveBatch manager * Recieve Batch Manager Implementation * Receive Batch Manager Implemetation * Handled review comments * Checkstyle failure * Flsky test case fixed * Flaky test case fixed * Hamdled review comments * Handled comments * Removed ReceiveMessageCompletableFuture * SdkClosable implemented * Added ReceiveMessageBatchManager class for completeness * Checkstyle issues * Null checks * Handled comments from Zoe * Updated the defaults to 50ms same as V1 after surface area review * Revert "Updated the defaults to 50ms same as V1 after surface area review" This reverts commit e7d2295. * Bytes Based batching for SendMessageRequest Batching (#5540) * Initial changes * Initial changes 2 * Byte Based batching for SendMessage API * Byte Based batching for SendMessage API * Handled comments * Checkstyle issue * Add User Agent for Sqs Calls made using Automatic Batching Manager (#5546) * Add User Agent for Sqs Calls made using Automatic Batching Manager as hll/abm * Review comments * Update comments from PR 5488 (#5550) * Update comments of PR 5488 * Update comments from PR 5488 * Handled surface area review comments (#5563) * Initial version * Intermediate changes * Update after internal poll * ResponseCOnfiguration construction updated * RequestOverride configuration check added to Bypass batch manager * Handled review comments * Removed TODO since validations are handled in BatchPverrideConfiguration * Fix issue where the Scheduled Timeout was incorrectly completing the futures with empty messages (#5571) * Fix issue where the Scheduled Timeout was incorrectly completing the futures with empty messages * Handled review comments * Integ test for Automatic Request Batching (#5576) * feat(sqs): add BatchManager for client-side request batching to Amazon SQS The new BatchManager allows for simple request batching using client-side buffering, improving cost efficiency and reducing the number of requests sent to Amazon SQS. The client-side buffering supports up to 10 requests per batch and is supported by the SqsAsyncClient. Batched requests, along with receive message polling, help to increase throughput. * Add check for scheduledExecutor such that it not null when creating SqsAsyncBatchManager (#5582) * Add check for scheduledExecutor such that it not null when creating SqsAsyncBatchManager * Update services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerBuilderTest.java Co-authored-by: David Ho <[email protected]> * Update services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerBuilderTest.java Co-authored-by: David Ho <[email protected]> * Update services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/SqsAsyncBatchManagerBuilderTest.java Co-authored-by: David Ho <[email protected]> --------- Co-authored-by: David Ho <[email protected]> * Updating Timeouts in gets so that we dont wait infinitely --------- Co-authored-by: David Ho <[email protected]>
1 parent 28cfca9 commit 40bdde8

File tree

64 files changed

+7154
-7
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+7154
-7
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "Amazon Simple Queue Service",
4+
"contributor": "",
5+
"description": "The AWS SDK for Java now supports a new `BatchManager` for Amazon Simple Queue Service (SQS), allowing for client-side request batching with `SqsAsyncClient`. This feature improves cost efficiency by buffering up to 10 requests before sending them as a batch to SQS. The implementation also supports receive message polling, which further enhances throughput by minimizing the number of individual requests sent. The batched requests help to optimize performance and reduce the costs associated with using Amazon SQS."
6+
}

codegen/src/main/java/software/amazon/awssdk/codegen/AddMetadata.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public static Metadata constructMetadata(ServiceModel serviceModel,
6060
.withBaseBuilder(String.format(Constant.BASE_BUILDER_CLASS_NAME_PATTERN, serviceName))
6161
.withDocumentation(serviceModel.getDocumentation())
6262
.withServiceAbbreviation(serviceMetadata.getServiceAbbreviation())
63+
.withBatchmanagerPackageName(namingStrategy.getBatchManagerPackageName(serviceName))
6364
.withServiceFullName(serviceMetadata.getServiceFullName())
6465
.withServiceName(serviceName)
6566
.withSyncClient(String.format(Constant.SYNC_CLIENT_CLASS_NAME_PATTERN, serviceName))

codegen/src/main/java/software/amazon/awssdk/codegen/internal/Constant.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ public final class Constant {
7676

7777
public static final String PACKAGE_NAME_SMOKE_TEST_PATTERN = "%s.smoketests";
7878

79+
public static final String PACKAGE_NAME_BATCHMANAGER_PATTERN = "%s.batchmanager";
80+
7981
public static final String PACKAGE_NAME_CUSTOM_AUTH_PATTERN = "%s.auth";
8082

8183
public static final String AUTH_POLICY_ENUM_CLASS_DIR = "software/amazon/awssdk/auth/policy/actions";

codegen/src/main/java/software/amazon/awssdk/codegen/model/config/customization/CustomizationConfig.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,11 @@ public class CustomizationConfig {
342342
*/
343343
private Map<String, PreClientExecutionRequestCustomizer> preClientExecutionRequestCustomizer;
344344

345+
/**
346+
* A boolean flag to indicate if Automatic Batch Request is supported.
347+
*/
348+
private boolean batchManagerSupported;
349+
345350
private CustomizationConfig() {
346351
}
347352

@@ -901,4 +906,12 @@ public void setPreClientExecutionRequestCustomizer(Map<String, PreClientExecutio
901906
this.preClientExecutionRequestCustomizer = preClientExecutionRequestCustomizer;
902907
}
903908

909+
public boolean getBatchManagerSupported() {
910+
return batchManagerSupported;
911+
}
912+
913+
public void setBatchManagerSupported(boolean batchManagerSupported) {
914+
this.batchManagerSupported = batchManagerSupported;
915+
}
916+
904917
}

codegen/src/main/java/software/amazon/awssdk/codegen/model/intermediate/Metadata.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,8 @@ public class Metadata {
7171

7272
private String waitersPackageName;
7373

74+
private String batchManagerPackageName;
75+
7476
private String endpointRulesPackageName;
7577

7678
private String authSchemePackageName;
@@ -789,4 +791,22 @@ public String getFullInternalJmesPathPackageName() {
789791
return joinPackageNames(getFullJmesPathPackageName(), "internal");
790792
}
791793

794+
public Metadata withBatchmanagerPackageName(String batchmanagerPackageName) {
795+
setBatchManagerPackageName(batchmanagerPackageName);
796+
return this;
797+
}
798+
799+
800+
public String getBatchManagerPackageName() {
801+
return batchManagerPackageName;
802+
}
803+
804+
public void setBatchManagerPackageName(String batchManagerPackageName) {
805+
this.batchManagerPackageName = batchManagerPackageName;
806+
}
807+
808+
public String getFullBatchManagerPackageName() {
809+
return joinPackageNames(rootPackageName, getBatchManagerPackageName());
810+
}
811+
792812
}

codegen/src/main/java/software/amazon/awssdk/codegen/naming/DefaultNamingStrategy.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,11 @@ public String getJmesPathPackageName(String serviceName) {
189189
return getCustomizedPackageName(concatServiceNameIfShareModel(serviceName), Constant.PACKAGE_NAME_JMESPATH_PATTERN);
190190
}
191191

192+
@Override
193+
public String getBatchManagerPackageName(String serviceName) {
194+
return getCustomizedPackageName(concatServiceNameIfShareModel(serviceName), Constant.PACKAGE_NAME_BATCHMANAGER_PATTERN);
195+
}
196+
192197
@Override
193198
public String getSmokeTestPackageName(String serviceName) {
194199

codegen/src/main/java/software/amazon/awssdk/codegen/naming/NamingStrategy.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ public interface NamingStrategy {
7474
*/
7575
String getJmesPathPackageName(String serviceName);
7676

77+
/**
78+
* Retrieve the batchManager package name that should be used based on the service name.
79+
*/
80+
String getBatchManagerPackageName(String serviceName);
81+
7782
/**
7883
* Retrieve the smote test package name that should be used based on the service name.
7984
*/

codegen/src/main/java/software/amazon/awssdk/codegen/poet/PoetExtension.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,4 +196,9 @@ public boolean isResponse(ShapeModel shapeModel) {
196196
public boolean isRequest(ShapeModel shapeModel) {
197197
return shapeModel.getShapeType() == ShapeType.Request;
198198
}
199+
200+
public ClassName getBatchManagerAsyncInterface() {
201+
return ClassName.get(model.getMetadata().getFullBatchManagerPackageName(),
202+
model.getMetadata().getServiceName() + "AsyncBatchManager");
203+
}
199204
}

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/AsyncClientClass.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
package software.amazon.awssdk.codegen.poet.client;
1717

18+
import static com.squareup.javapoet.TypeSpec.Builder;
1819
import static java.util.Collections.singletonList;
1920
import static java.util.stream.Collectors.toList;
2021
import static javax.lang.model.element.Modifier.FINAL;
@@ -104,6 +105,7 @@ public final class AsyncClientClass extends AsyncClientInterface {
104105
private final ClassName serviceClientConfigurationClassName;
105106
private final ServiceClientConfigurationUtils configurationUtils;
106107
private final boolean useSraAuth;
108+
private boolean hasScheduledExecutor;
107109

108110
public AsyncClientClass(GeneratorTaskParams dependencies) {
109111
super(dependencies.getModel());
@@ -139,7 +141,7 @@ protected void addModifiers(TypeSpec.Builder type) {
139141
}
140142

141143
@Override
142-
protected void addFields(TypeSpec.Builder type) {
144+
protected void addFields(Builder type) {
143145
type.addField(FieldSpec.builder(ClassName.get(Logger.class), "log")
144146
.addModifiers(PRIVATE, STATIC, FINAL)
145147
.initializer("$T.getLogger($T.class)", LoggerFactory.class,
@@ -155,6 +157,10 @@ protected void addFields(TypeSpec.Builder type) {
155157
type.addField(AwsJsonProtocolFactory.class, "jsonProtocolFactory", PRIVATE, FINAL);
156158
}
157159

160+
if (shouldAddScheduledExecutor()) {
161+
addScheduledExecutorIfNeeded(type);
162+
}
163+
158164
model.getEndpointOperation().ifPresent(
159165
o -> type.addField(EndpointDiscoveryRefreshCache.class, "endpointDiscoveryCache", PRIVATE));
160166
}
@@ -180,9 +186,6 @@ protected void addAdditionalMethods(TypeSpec.Builder type) {
180186

181187
@Override
182188
protected void addWaiterMethod(TypeSpec.Builder type) {
183-
type.addField(FieldSpec.builder(ClassName.get(ScheduledExecutorService.class), "executorService")
184-
.addModifiers(PRIVATE, FINAL)
185-
.build());
186189

187190
MethodSpec waiter = MethodSpec.methodBuilder("waiter")
188191
.addModifiers(PUBLIC)
@@ -263,14 +266,18 @@ private MethodSpec constructor(TypeSpec.Builder classBuilder) {
263266
builder.endControlFlow();
264267
}
265268

266-
if (model.hasWaiters()) {
269+
if (shouldAddScheduledExecutor()) {
267270
builder.addStatement("this.executorService = clientConfiguration.option($T.SCHEDULED_EXECUTOR_SERVICE)",
268271
SdkClientOption.class);
269272
}
270273

271274
return builder.build();
272275
}
273276

277+
private boolean shouldAddScheduledExecutor() {
278+
return model.hasWaiters() || model.getCustomizationConfig().getBatchManagerSupported();
279+
}
280+
274281
private boolean hasOperationWithEventStreamOutput() {
275282
return model.getOperations().values().stream().anyMatch(OperationModel::hasEventStreamOutput);
276283
}
@@ -547,6 +554,26 @@ protected MethodSpec utilitiesMethod() {
547554
.build();
548555
}
549556

557+
@Override
558+
protected void addBatchManagerMethod(Builder type) {
559+
560+
String scheduledExecutor = "executorService";
561+
ClassName returnType;
562+
563+
returnType = poetExtensions.getBatchManagerAsyncInterface();
564+
565+
MethodSpec batchManager = MethodSpec.methodBuilder("batchManager")
566+
.addModifiers(PUBLIC)
567+
.addAnnotation(Override.class)
568+
.returns(returnType)
569+
.addStatement("return $T.builder().client(this).scheduledExecutor($N).build()",
570+
returnType, scheduledExecutor)
571+
.build();
572+
573+
574+
type.addMethod(batchManager);
575+
}
576+
550577
private MethodSpec resolveMetricPublishersMethod() {
551578
String clientConfigName = "clientConfiguration";
552579
String requestOverrideConfigName = "requestOverrideConfiguration";
@@ -621,4 +648,13 @@ private boolean hasStreamingV4AuthOperations() {
621648
return model.getOperations().values().stream()
622649
.anyMatch(this::shouldUseAsyncWithBodySigner);
623650
}
651+
652+
private void addScheduledExecutorIfNeeded(Builder classBuilder) {
653+
if (!hasScheduledExecutor) {
654+
classBuilder.addField(FieldSpec.builder(ClassName.get(ScheduledExecutorService.class), "executorService")
655+
.addModifiers(PRIVATE, FINAL)
656+
.build());
657+
hasScheduledExecutor = true;
658+
}
659+
}
624660
}

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/AsyncClientInterface.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ public TypeSpec poetSpec() {
9696
if (model.hasWaiters()) {
9797
addWaiterMethod(result);
9898
}
99+
if (model.getCustomizationConfig().getBatchManagerSupported()) {
100+
addBatchManagerMethod(result);
101+
}
99102
result.addMethod(serviceClientConfigMethod());
100103
addAdditionalMethods(result);
101104
addCloseMethod(result);
@@ -162,6 +165,16 @@ protected void addWaiterMethod(TypeSpec.Builder type) {
162165
type.addMethod(waiterOperationBody(builder).build());
163166
}
164167

168+
protected void addBatchManagerMethod(TypeSpec.Builder type) {
169+
ClassName returnType = poetExtensions.getBatchManagerAsyncInterface();
170+
MethodSpec.Builder builder = MethodSpec.methodBuilder("batchManager")
171+
.addModifiers(PUBLIC)
172+
.returns(returnType)
173+
.addJavadoc("Creates an instance of {@link $T} object with the "
174+
+ "configuration set on this client.", returnType);
175+
type.addMethod(batchManagerOperationBody(builder).build());
176+
}
177+
165178
@Override
166179
public ClassName className() {
167180
return className;
@@ -532,4 +545,10 @@ protected MethodSpec.Builder waiterOperationBody(MethodSpec.Builder builder) {
532545
return builder.addModifiers(DEFAULT, PUBLIC)
533546
.addStatement("throw new $T()", UnsupportedOperationException.class);
534547
}
548+
549+
protected MethodSpec.Builder batchManagerOperationBody(MethodSpec.Builder builder) {
550+
return builder.addModifiers(DEFAULT, PUBLIC)
551+
.addStatement("throw new $T()", UnsupportedOperationException.class);
552+
}
553+
535554
}

codegen/src/main/java/software/amazon/awssdk/codegen/poet/client/DelegatingAsyncClientClass.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,4 +217,9 @@ protected MethodSpec.Builder utilitiesOperationBody(MethodSpec.Builder builder)
217217
protected MethodSpec.Builder waiterOperationBody(MethodSpec.Builder builder) {
218218
return builder.addAnnotation(Override.class).addStatement("return delegate.waiter()");
219219
}
220+
221+
@Override
222+
protected MethodSpec.Builder batchManagerOperationBody(MethodSpec.Builder builder) {
223+
return builder.addAnnotation(Override.class).addStatement("return delegate.batchManager()");
224+
}
220225
}

codegen/src/test/java/software/amazon/awssdk/codegen/poet/ClientTestModels.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,18 @@ public static IntermediateModel internalConfigModels() {
398398
return new IntermediateModelBuilder(models).build();
399399
}
400400

401+
public static IntermediateModel batchManagerModels() {
402+
File serviceModel = new File(ClientTestModels.class.getResource("client/c2j/batchmanager/service-2.json").getFile());
403+
File customizationModel = new File(ClientTestModels.class.getResource("client/c2j/batchmanager/customization.config").getFile());
404+
405+
C2jModels models = C2jModels.builder()
406+
.serviceModel(getServiceModel(serviceModel))
407+
.customizationConfig(getCustomizationConfig(customizationModel))
408+
.build();
409+
410+
return new IntermediateModelBuilder(models).build();
411+
}
412+
401413
private static ServiceModel getServiceModel(File file) {
402414
return ModelLoaderUtils.loadModel(ServiceModel.class, file);
403415
}

codegen/src/test/java/software/amazon/awssdk/codegen/poet/client/AsyncClientClassTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static org.hamcrest.MatcherAssert.assertThat;
1919
import static software.amazon.awssdk.codegen.poet.ClientTestModels.awsJsonServiceModels;
2020
import static software.amazon.awssdk.codegen.poet.ClientTestModels.awsQueryCompatibleJsonServiceModels;
21+
import static software.amazon.awssdk.codegen.poet.ClientTestModels.batchManagerModels;
2122
import static software.amazon.awssdk.codegen.poet.ClientTestModels.customContentTypeModels;
2223
import static software.amazon.awssdk.codegen.poet.ClientTestModels.customPackageModels;
2324
import static software.amazon.awssdk.codegen.poet.ClientTestModels.endpointDiscoveryModels;
@@ -92,6 +93,12 @@ public void asyncClientCustomPackageName() {
9293
assertThat(syncClientCustomServiceMetaData, generatesTo("test-custompackage-async.java"));
9394
}
9495

96+
@Test
97+
public void asyncClientBatchManager() {
98+
ClassSpec aSyncClientBatchManager = createAsyncClientClass(batchManagerModels());
99+
assertThat(aSyncClientBatchManager, generatesTo("test-batchmanager-async.java"));
100+
}
101+
95102
private AsyncClientClass createAsyncClientClass(IntermediateModel model) {
96103
return new AsyncClientClass(GeneratorTaskParams.create(model, "sources/", "tests/", "resources/"));
97104
}

codegen/src/test/java/software/amazon/awssdk/codegen/poet/client/AsyncClientInterfaceTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package software.amazon.awssdk.codegen.poet.client;
1717

1818
import static org.hamcrest.MatcherAssert.assertThat;
19+
import static software.amazon.awssdk.codegen.poet.ClientTestModels.batchManagerModels;
1920
import static software.amazon.awssdk.codegen.poet.ClientTestModels.restJsonServiceModels;
2021
import static software.amazon.awssdk.codegen.poet.PoetMatchers.generatesTo;
2122

@@ -28,4 +29,10 @@ public void asyncClientInterface() {
2829
ClassSpec asyncClientInterface = new AsyncClientInterface(restJsonServiceModels());
2930
assertThat(asyncClientInterface, generatesTo("test-json-async-client-interface.java"));
3031
}
32+
33+
@Test
34+
public void asyncClientInterfaceWithBatchManager() {
35+
ClassSpec asyncClientInterface = new AsyncClientInterface(batchManagerModels());
36+
assertThat(asyncClientInterface, generatesTo("test-json-async-client-interface-batchmanager.java"));
37+
}
3138
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
{
2+
"batchManagerSupported": true
3+
}

0 commit comments

Comments
 (0)