Skip to content

Commit b19b34a

Browse files
committed
RequestOverride configuration check added to Bypass batch manager
1 parent a81119b commit b19b34a

File tree

2 files changed

+137
-90
lines changed

2 files changed

+137
-90
lines changed

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveMessageBatchManager.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.util.concurrent.ScheduledExecutorService;
2222
import software.amazon.awssdk.annotations.SdkInternalApi;
2323
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
24-
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
2524
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
2625
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
2726
import software.amazon.awssdk.utils.Logger;
@@ -48,12 +47,12 @@ public ReceiveMessageBatchManager(SqsAsyncClient sqsClient,
4847
}
4948

5049
public CompletableFuture<ReceiveMessageResponse> batchRequest(ReceiveMessageRequest request) {
51-
if (canBeRetrievedFromQueueBuffer(request)) {
50+
String ineligibleReason = checkBatchingEligibility(request);
51+
if (ineligibleReason == null) {
5252
return receiveBatchManagerMap.computeIfAbsent(generateBatchKey(request), key -> createReceiveBatchManager(request))
5353
.processRequest(request);
5454
} else {
55-
log.debug(() -> "canBeRetrievedFromQueueBuffer failed, so skipping batching for request for Queue with URL: "
56-
+ request.queueUrl());
55+
log.debug(() -> String.format("Batching skipped. Reason: %s", ineligibleReason));
5756
return sqsClient.receiveMessage(request);
5857
}
5958
}
@@ -79,11 +78,25 @@ public void close() {
7978
receiveBatchManagerMap.values().forEach(ReceiveBatchManager::close);
8079
}
8180

82-
private boolean canBeRetrievedFromQueueBuffer(ReceiveMessageRequest rq) {
83-
return hasCompatibleAttributes(rq) && isBufferingEnabled() && rq.visibilityTimeout() == null;
81+
private String checkBatchingEligibility(ReceiveMessageRequest rq) {
82+
if (!hasCompatibleAttributes(rq)) {
83+
return "Incompatible attributes.";
84+
}
85+
if (rq.visibilityTimeout() != null) {
86+
return "Visibility timeout is set.";
87+
}
88+
if (!isBufferingEnabled()) {
89+
return "Buffering is disabled.";
90+
}
91+
if (rq.overrideConfiguration().isPresent()) {
92+
return "Request has override configurations.";
93+
}
94+
if (rq.waitTimeSeconds() != null && rq.waitTimeSeconds() != 0) {
95+
return "Request has long polling enabled.";
96+
}
97+
return null;
8498
}
8599

86-
87100
private boolean hasCompatibleAttributes(ReceiveMessageRequest rq) {
88101
return !rq.hasAttributeNames()
89102
&& hasCompatibleSystemAttributes(rq)

services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveMessageBatchManagerTest.java

Lines changed: 117 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,26 @@
1616
package software.amazon.awssdk.services.sqs.batchmanager;
1717

1818

19+
import static org.assertj.core.api.Assertions.assertThat;
1920
import static org.junit.jupiter.api.Assertions.assertEquals;
20-
import static org.junit.jupiter.api.Assertions.assertNotEquals;
2121
import static org.mockito.ArgumentCaptor.forClass;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.Mockito.atLeast;
24+
import static org.mockito.Mockito.times;
25+
import static org.mockito.Mockito.verify;
26+
import static org.mockito.Mockito.when;
2227

28+
import java.time.Duration;
2329
import java.util.Collections;
2430
import java.util.HashMap;
2531
import java.util.Map;
32+
import java.util.concurrent.CompletableFuture;
2633
import java.util.concurrent.Executors;
34+
import java.util.concurrent.ScheduledExecutorService;
2735
import java.util.concurrent.TimeUnit;
2836
import java.util.stream.Stream;
37+
38+
import org.apache.logging.log4j.Level;
2939
import org.junit.jupiter.api.DisplayName;
3040
import org.junit.jupiter.api.extension.ExtendWith;
3141
import org.junit.jupiter.params.ParameterizedTest;
@@ -43,83 +53,92 @@
4353
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
4454
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
4555
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
46-
47-
import java.util.concurrent.CompletableFuture;
48-
import java.util.concurrent.ScheduledExecutorService;
49-
50-
import static org.mockito.ArgumentMatchers.any;
51-
import static org.mockito.Mockito.*;
52-
56+
import software.amazon.awssdk.testutils.LogCaptor;
5357

5458
@ExtendWith(MockitoExtension.class)
5559
class ReceiveMessageBatchManagerTest {
5660

61+
private static final ScheduledExecutorService EXECUTOR = Executors.newScheduledThreadPool(4);
62+
5763
@Mock
5864
private SqsAsyncClient sqsClient;
5965

60-
private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
61-
6266
private ReceiveMessageBatchManager receiveMessageBatchManager;
6367

64-
6568
@ParameterizedTest(name = "{index} => {0}")
6669
@MethodSource("provideBatchOverrideConfigurations")
6770
@DisplayName("Test BatchRequest with various configurations")
68-
void testBatchRequest_WhenBufferingDisabledAndInCompatible_ShouldNotUseBatchManager(String testCaseName,
69-
ResponseBatchConfiguration overrideConfig,
70-
ReceiveMessageRequest request,
71-
boolean useBatchManager) throws Exception {
72-
73-
// Initialize the ResponseBatchConfiguration and ReceiveMessageBatchManager
74-
ResponseBatchConfiguration config = ResponseBatchConfiguration.builder()
75-
.messageSystemAttributeNames(overrideConfig.messageSystemAttributeNames())
76-
.receiveMessageAttributeNames(overrideConfig.receiveMessageAttributeNames())
77-
.visibilityTimeout(overrideConfig.visibilityTimeout())
78-
.messageMinWaitDuration(overrideConfig.messageMinWaitDuration()).build();
79-
80-
receiveMessageBatchManager = new ReceiveMessageBatchManager(sqsClient, executor, overrideConfig);
81-
82-
CompletableFuture<ReceiveMessageResponse> mockResponse =
83-
CompletableFuture.completedFuture(ReceiveMessageResponse.builder().build());
84-
String visibilityTimeout = "1";
85-
when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(mockResponse);
86-
if(useBatchManager) {
87-
mockGetQueueAttributesResponse("0", visibilityTimeout);
88-
}
71+
void testBatchRequest(String testCaseName,
72+
ResponseBatchConfiguration overrideConfig,
73+
ReceiveMessageRequest request,
74+
boolean useBatchManager,
75+
String inEligibleReason) throws Exception {
8976

77+
setupBatchManager(overrideConfig);
9078

91-
CompletableFuture<ReceiveMessageResponse> result = receiveMessageBatchManager.batchRequest(request);
92-
result.get(2, TimeUnit.SECONDS);
79+
CompletableFuture<ReceiveMessageResponse> mockResponse = CompletableFuture.completedFuture(
80+
ReceiveMessageResponse.builder().build());
81+
when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))).thenReturn(mockResponse);
9382

94-
// Enough time to make sure any spawned task after receiving response is completed
95-
Thread.sleep(500);
83+
try (LogCaptor logCaptor = LogCaptor.create(Level.DEBUG)) {
9684

97-
// Capture the argument passed to receiveMessage
98-
ArgumentCaptor<ReceiveMessageRequest> requestCaptor = forClass(ReceiveMessageRequest.class);
85+
if (useBatchManager) {
86+
mockQueueAttributes("0", "1");
87+
}
9988

100-
if (useBatchManager) {
101-
verify(sqsClient, atLeast(1)).receiveMessage(requestCaptor.capture());
89+
CompletableFuture<ReceiveMessageResponse> result = receiveMessageBatchManager.batchRequest(request);
90+
result.get(2, TimeUnit.SECONDS);
91+
Thread.sleep(500);
10292

103-
// Assertions to verify the behavior when batch manager is used
104-
assertEquals(config.maxBatchItems(), requestCaptor.getValue().maxNumberOfMessages());
105-
assertEquals(Integer.parseInt(visibilityTimeout), requestCaptor.getValue().visibilityTimeout());
106-
} else {
107-
verify(sqsClient, times(1)).receiveMessage(requestCaptor.capture());
93+
ArgumentCaptor<ReceiveMessageRequest> requestCaptor = forClass(ReceiveMessageRequest.class);
10894

109-
// Assertions to verify the behavior when batch manager is not used
110-
assertEquals(request.maxNumberOfMessages(), requestCaptor.getValue().maxNumberOfMessages());
111-
assertEquals(request.visibilityTimeout(), requestCaptor.getValue().visibilityTimeout());
112-
assertNotEquals(config.maxBatchItems(),
113-
requestCaptor.getValue().maxNumberOfMessages());
95+
if (useBatchManager) {
96+
verifyBatchManagerUsed(requestCaptor);
97+
} else {
98+
verifyBatchManagerNotUsed(request, requestCaptor, logCaptor, inEligibleReason);
99+
}
114100
}
115101
}
116102

103+
private void setupBatchManager(ResponseBatchConfiguration overrideConfig) {
104+
receiveMessageBatchManager = new ReceiveMessageBatchManager(sqsClient, EXECUTOR, overrideConfig);
105+
}
117106

107+
private void verifyBatchManagerUsed(ArgumentCaptor<ReceiveMessageRequest> requestCaptor) {
108+
verify(sqsClient, atLeast(1)).receiveMessage(requestCaptor.capture());
109+
assertEquals(ResponseBatchConfiguration.MAX_DONE_RECEIVE_BATCHES_DEFAULT,
110+
requestCaptor.getValue().maxNumberOfMessages());
111+
}
112+
113+
private void verifyBatchManagerNotUsed(ReceiveMessageRequest request,
114+
ArgumentCaptor<ReceiveMessageRequest> requestCaptor,
115+
LogCaptor logCaptor,
116+
String inEligibleReason) {
117+
verify(sqsClient, times(1)).receiveMessage(requestCaptor.capture());
118+
assertEquals(request.maxNumberOfMessages(), requestCaptor.getValue().maxNumberOfMessages());
119+
assertEquals(request.visibilityTimeout(), requestCaptor.getValue().visibilityTimeout());
120+
assertThat(logCaptor.loggedEvents())
121+
.anySatisfy(logEvent -> assertThat(logEvent.getMessage().getFormattedMessage())
122+
.contains(inEligibleReason));
123+
}
124+
125+
private void mockQueueAttributes(String receiveMessageWaitTimeSeconds, String visibilityTimeout) {
126+
Map<QueueAttributeName, String> attributes = new HashMap<>();
127+
attributes.put(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, receiveMessageWaitTimeSeconds);
128+
attributes.put(QueueAttributeName.VISIBILITY_TIMEOUT, visibilityTimeout);
129+
130+
GetQueueAttributesResponse response = GetQueueAttributesResponse.builder()
131+
.attributes(attributes)
132+
.build();
133+
134+
when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class)))
135+
.thenReturn(CompletableFuture.completedFuture(response));
136+
}
118137

119138
private static Stream<Arguments> provideBatchOverrideConfigurations() {
120139
return Stream.of(
121140
Arguments.of(
122-
"Buffering enabled, compatible system and message attributes, and no visibility timeout",
141+
"Buffering enabled, compatible system and message attributes, no visibility timeout",
123142
ResponseBatchConfiguration.builder()
124143
.receiveMessageAttributeNames(Collections.singletonList("attr1"))
125144
.messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID))
@@ -129,10 +148,11 @@ private static Stream<Arguments> provideBatchOverrideConfigurations() {
129148
.messageAttributeNames(Collections.singletonList("attr1"))
130149
.messageSystemAttributeNames(MessageSystemAttributeName.SENDER_ID)
131150
.build(),
132-
true
151+
true,
152+
""
133153
),
134154
Arguments.of(
135-
"Buffering , compatible attributes, and no visibility timeout",
155+
"Buffering enabled, compatible attributes, no visibility timeout but deprecated attributeNames",
136156
ResponseBatchConfiguration.builder()
137157
.receiveMessageAttributeNames(Collections.singletonList("attr1"))
138158
.messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID))
@@ -141,27 +161,27 @@ private static Stream<Arguments> provideBatchOverrideConfigurations() {
141161
.queueUrl("testQueueUrl")
142162
.messageAttributeNames(Collections.singletonList("attr1"))
143163
.messageSystemAttributeNamesWithStrings(Collections.singletonList("SenderId"))
144-
// attributeNames which is Deprecated api not supported for Batching
145-
.attributeNames(QueueAttributeName.ALL)
164+
.attributeNames(QueueAttributeName.ALL) // Deprecated api not supported for Batching
146165
.build(),
147-
false
166+
false,
167+
"Incompatible attributes."
148168
),
149169
Arguments.of(
150-
"Buffering disabled, incompatible system attributes, and no visibility timeout",
170+
"Buffering disabled, incompatible system attributes, no visibility timeout",
151171
ResponseBatchConfiguration.builder()
152172
.receiveMessageAttributeNames(Collections.singletonList("attr1"))
153173
.messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENT_TIMESTAMP))
154174
.build(),
155175
ReceiveMessageRequest.builder()
156176
.queueUrl("testQueueUrl")
157177
.messageAttributeNames(Collections.singletonList("attr1"))
158-
.messageSystemAttributeNamesWithStrings(Collections.singletonList("SenderId")) //
159-
// Incompatible system attribute
178+
.messageSystemAttributeNamesWithStrings(Collections.singletonList("SenderId"))
160179
.build(),
161-
false
180+
false,
181+
"Incompatible attributes."
162182
),
163183
Arguments.of(
164-
"Buffering disabled, compatible attributes, but visibility timeout is set",
184+
"Buffering disabled, compatible attributes, visibility timeout is set",
165185
ResponseBatchConfiguration.builder()
166186
.receiveMessageAttributeNames(Collections.singletonList("attr1"))
167187
.messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID))
@@ -170,12 +190,13 @@ private static Stream<Arguments> provideBatchOverrideConfigurations() {
170190
.queueUrl("testQueueUrl")
171191
.messageAttributeNames(Collections.singletonList("attr1"))
172192
.messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID))
173-
.visibilityTimeout(30) // Visibility timeout is set
193+
.visibilityTimeout(30)
174194
.build(),
175-
false
195+
false,
196+
"Visibility timeout is set."
176197
),
177198
Arguments.of(
178-
"Buffering disabled, compatible attributes, no visibility timeout, but request has attribute names",
199+
"Buffering disabled, compatible attributes, no visibility timeout but has attribute names",
179200
ResponseBatchConfiguration.builder()
180201
.receiveMessageAttributeNames(Collections.singletonList("attr1"))
181202
.messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID))
@@ -184,36 +205,49 @@ private static Stream<Arguments> provideBatchOverrideConfigurations() {
184205
.queueUrl("testQueueUrl")
185206
.messageAttributeNames(Collections.singletonList("attr1"))
186207
.messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID))
187-
.attributeNamesWithStrings("All") // Request has attribute names
208+
.attributeNamesWithStrings("All")
188209
.build(),
189-
false
210+
false,
211+
"Incompatible attributes."
190212
),
191213
Arguments.of(
192-
"Buffering enabled, with messageSystemAttributeName in Config and simple ReceiveMessageRequest",
214+
"Buffering enabled, simple ReceiveMessageRequest, no visibility timeout",
193215
ResponseBatchConfiguration.builder()
194216
.messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID))
195217
.build(),
196218
ReceiveMessageRequest.builder()
197219
.queueUrl("testQueueUrl")
198220
.maxNumberOfMessages(3)
199221
.build(),
200-
true
222+
true,
223+
""
224+
),
225+
Arguments.of(
226+
"Buffering disabled, request has override config",
227+
ResponseBatchConfiguration.builder()
228+
.messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID))
229+
.build(),
230+
ReceiveMessageRequest.builder()
231+
.queueUrl("testQueueUrl")
232+
.maxNumberOfMessages(3)
233+
.overrideConfiguration(o -> o.apiCallTimeout(Duration.ofSeconds(2)))
234+
.build(),
235+
false,
236+
"Request has override configurations."
237+
),
238+
Arguments.of(
239+
"Buffering disabled, with waitTimeSeconds in ReceiveMessageRequest",
240+
ResponseBatchConfiguration.builder()
241+
.messageSystemAttributeNames(Collections.singletonList(MessageSystemAttributeName.SENDER_ID))
242+
.build(),
243+
ReceiveMessageRequest.builder()
244+
.queueUrl("testQueueUrl")
245+
.maxNumberOfMessages(3)
246+
.waitTimeSeconds(3)
247+
.build(),
248+
false,
249+
"Request has long polling enabled."
201250
)
202251
);
203252
}
204-
205-
private void mockGetQueueAttributesResponse(String receiveMessageWaitTimeSeconds, String visibilityTimeout) {
206-
Map<QueueAttributeName, String> attributes = new HashMap<>();
207-
attributes.put(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS, receiveMessageWaitTimeSeconds);
208-
attributes.put(QueueAttributeName.VISIBILITY_TIMEOUT, visibilityTimeout);
209-
210-
GetQueueAttributesResponse response = GetQueueAttributesResponse.builder()
211-
.attributes(attributes)
212-
.build();
213-
214-
when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class)))
215-
.thenReturn(CompletableFuture.completedFuture(response));
216-
}
217-
218-
219253
}

0 commit comments

Comments
 (0)