Skip to content

Commit 5a887a7

Browse files
committed
Handled review comments
1 parent 6ff4a77 commit 5a887a7

22 files changed

+174
-212
lines changed

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/batchmanager/BatchOverrideConfiguration.java

Lines changed: 42 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -36,30 +36,28 @@ public final class BatchOverrideConfiguration implements ToCopyableBuilder<Batch
3636
private final Integer maxBatchItems;
3737
private final Integer maxBatchKeys;
3838
private final Integer maxBufferSize;
39-
private final Duration maxBatchOpenInMs;
39+
private final Duration maxBatchOpen;
4040
private final Duration visibilityTimeout;
41-
private final Integer longPollWaitTimeoutSeconds;
42-
private final Duration minReceiveWaitTimeMs;
41+
private final Duration longPollWaitTimeout;
42+
private final Duration minReceiveWaitTime;
4343
private final Integer maxDoneReceiveBatches;
4444
private final List<String> receiveAttributeNames;
4545
private final List<String> receiveMessageAttributeNames;
4646
private final Boolean adaptivePrefetching;
4747
private final Integer maxInflightReceiveBatches;
48-
private final Boolean longPoll;
4948

5049
public BatchOverrideConfiguration(Builder builder) {
5150
this.maxBatchItems = Validate.isPositiveOrNull(builder.maxBatchItems, "maxBatchItems");
5251
this.maxBatchKeys = Validate.isPositiveOrNull(builder.maxBatchKeys, "maxBatchKeys");
5352
this.maxBufferSize = Validate.isPositiveOrNull(builder.maxBufferSize, "maxBufferSize");
54-
this.maxBatchOpenInMs = Validate.isPositiveOrNull(builder.maxBatchOpenInMs, "maxBatchOpenInMs");
55-
this.visibilityTimeout = Validate.isPositiveOrNull(builder.visibilityTimeout, "visibilityTimeoutSeconds");
56-
this.longPollWaitTimeoutSeconds = Validate.isPositiveOrNull(builder.longPollWaitTimeoutSeconds,
57-
"longPollWaitTimeoutSeconds");
58-
this.minReceiveWaitTimeMs = Validate.isPositiveOrNull(builder.minReceiveWaitTimeMs, "minReceiveWaitTimeMs");
53+
this.maxBatchOpen = Validate.isPositiveOrNull(builder.maxBatchOpen, "maxBatchOpen");
54+
this.visibilityTimeout = Validate.isPositiveOrNull(builder.visibilityTimeout, "visibilityTimeout");
55+
this.longPollWaitTimeout = Validate.isPositiveOrNull(builder.longPollWaitTimeout,
56+
"longPollWaitTimeout");
57+
this.minReceiveWaitTime = Validate.isPositiveOrNull(builder.minReceiveWaitTime, "minReceiveWaitTime");
5958
this.receiveAttributeNames = builder.receiveAttributeNames;
6059
this.receiveMessageAttributeNames = builder.receiveMessageAttributeNames;
6160
this.adaptivePrefetching = builder.adaptivePrefetching;
62-
this.longPoll = builder.longPoll;
6361
this.maxInflightReceiveBatches = builder.maxInflightReceiveBatches;
6462
this.maxDoneReceiveBatches = builder.maxDoneReceiveBatches;
6563
}
@@ -97,8 +95,8 @@ public Optional<Integer> maxDoneReceiveBatches() {
9795
* @return the optional maximum amount of time (in milliseconds) that an outgoing call waits to be batched with messages of
9896
* the same type.
9997
*/
100-
public Optional<Duration> maxBatchOpenInMs() {
101-
return Optional.ofNullable(maxBatchOpenInMs);
98+
public Optional<Duration> maxBatchOpen() {
99+
return Optional.ofNullable(maxBatchOpen);
102100
}
103101

104102
/**
@@ -112,15 +110,15 @@ public Optional<Duration> visibilityTimeout() {
112110
* @return the amount of time, in seconds, the receive call will block on the server waiting for messages to arrive if the
113111
* queue is empty when the receive call is first made.
114112
*/
115-
public Optional<Integer> longPollWaitTimeoutSeconds() {
116-
return Optional.ofNullable(longPollWaitTimeoutSeconds);
113+
public Optional<Duration> longPollWaitTimeout() {
114+
return Optional.ofNullable(longPollWaitTimeout);
117115
}
118116

119117
/**
120118
* @return the minimum wait time for incoming receive message requests.
121119
*/
122-
public Optional<Duration> minReceiveWaitTimeMs() {
123-
return Optional.ofNullable(minReceiveWaitTimeMs);
120+
public Optional<Duration> minReceiveWaitTime() {
121+
return Optional.ofNullable(minReceiveWaitTime);
124122
}
125123

126124
/**
@@ -144,13 +142,6 @@ public Optional<Boolean> adaptivePrefetching() {
144142
return Optional.ofNullable(adaptivePrefetching);
145143
}
146144

147-
/**
148-
* @return the option for long polling.
149-
*/
150-
public Optional<Boolean> longPoll() {
151-
return Optional.ofNullable(longPoll);
152-
}
153-
154145
/**
155146
* @return the maximum number of concurrent receive message batches.
156147
*/
@@ -163,16 +154,15 @@ public Builder toBuilder() {
163154
return new Builder().maxBatchItems(maxBatchItems)
164155
.maxBatchKeys(maxBatchKeys)
165156
.maxBufferSize(maxBufferSize)
166-
.maxBatchOpenInMs(maxBatchOpenInMs)
157+
.maxBatchOpen(maxBatchOpen)
167158
.visibilityTimeout(visibilityTimeout)
168-
.longPollWaitTimeoutSeconds(longPollWaitTimeoutSeconds)
169-
.minReceiveWaitTimeMs(minReceiveWaitTimeMs)
159+
.longPollWaitTimeout(longPollWaitTimeout)
160+
.minReceiveWaitTime(minReceiveWaitTime)
170161
.maxInflightReceiveBatches(maxInflightReceiveBatches)
171162
.receiveAttributeNames(receiveAttributeNames)
172163
.receiveMessageAttributeNames(receiveMessageAttributeNames)
173164
.adaptivePrefetching(adaptivePrefetching)
174-
.maxDoneReceiveBatches(maxDoneReceiveBatches)
175-
.longPoll(longPoll);
165+
.maxDoneReceiveBatches(maxDoneReceiveBatches);
176166
}
177167

178168
@Override
@@ -181,16 +171,15 @@ public String toString() {
181171
.add("maxBatchItems", maxBatchItems)
182172
.add("maxBatchKeys", maxBatchKeys)
183173
.add("maxBufferSize", maxBufferSize)
184-
.add("maxBatchOpenInMs", maxBatchOpenInMs.toMillis())
185-
.add("visibilityTimeoutSeconds", visibilityTimeout)
186-
.add("longPollWaitTimeoutSeconds", longPollWaitTimeoutSeconds)
187-
.add("minReceiveWaitTimeMs", minReceiveWaitTimeMs)
174+
.add("maxBatchOpen", maxBatchOpen.toMillis())
175+
.add("visibilityTimeout", visibilityTimeout)
176+
.add("longPollWaitTimeout", longPollWaitTimeout)
177+
.add("minReceiveWaitTime", minReceiveWaitTime)
188178
.add("receiveAttributeNames", receiveAttributeNames)
189179
.add("receiveMessageAttributeNames", receiveMessageAttributeNames)
190180
.add("adaptivePrefetching", adaptivePrefetching)
191181
.add("maxInflightReceiveBatches", maxInflightReceiveBatches)
192182
.add("maxDoneReceiveBatches", maxDoneReceiveBatches)
193-
.add("longPoll", longPoll)
194183
.build();
195184
}
196185

@@ -215,19 +204,19 @@ public boolean equals(Object o) {
215204
return false;
216205
}
217206

218-
if (maxBatchOpenInMs != null ? !maxBatchOpenInMs.equals(that.maxBatchOpenInMs) : that.maxBatchOpenInMs != null) {
207+
if (maxBatchOpen != null ? !maxBatchOpen.equals(that.maxBatchOpen) : that.maxBatchOpen != null) {
219208
return false;
220209
}
221210
if (visibilityTimeout != null ? !visibilityTimeout.equals(that.visibilityTimeout) :
222211
that.visibilityTimeout != null) {
223212
return false;
224213
}
225-
if (longPollWaitTimeoutSeconds != null ? !longPollWaitTimeoutSeconds.equals(that.longPollWaitTimeoutSeconds) :
226-
that.longPollWaitTimeoutSeconds != null) {
214+
if (longPollWaitTimeout != null ? !longPollWaitTimeout.equals(that.longPollWaitTimeout) :
215+
that.longPollWaitTimeout != null) {
227216
return false;
228217
}
229-
if (minReceiveWaitTimeMs != null ? !minReceiveWaitTimeMs.equals(that.minReceiveWaitTimeMs) :
230-
that.minReceiveWaitTimeMs != null) {
218+
if (minReceiveWaitTime != null ? !minReceiveWaitTime.equals(that.minReceiveWaitTime) :
219+
that.minReceiveWaitTime != null) {
231220
return false;
232221
}
233222
if (receiveAttributeNames != null ? !receiveAttributeNames.equals(that.receiveAttributeNames) :
@@ -242,9 +231,6 @@ public boolean equals(Object o) {
242231
that.adaptivePrefetching != null) {
243232
return false;
244233
}
245-
if (longPoll != null ? !longPoll.equals(that.longPoll) : that.longPoll != null) {
246-
return false;
247-
}
248234
if (maxInflightReceiveBatches != null ? !maxInflightReceiveBatches.equals(that.maxInflightReceiveBatches) :
249235
that.maxInflightReceiveBatches != null) {
250236
return false;
@@ -258,14 +244,13 @@ public int hashCode() {
258244
int result = maxBatchItems != null ? maxBatchItems.hashCode() : 0;
259245
result = 31 * result + (maxBatchKeys != null ? maxBatchKeys.hashCode() : 0);
260246
result = 31 * result + (maxBufferSize != null ? maxBufferSize.hashCode() : 0);
261-
result = 31 * result + (maxBatchOpenInMs != null ? maxBatchOpenInMs.hashCode() : 0);
247+
result = 31 * result + (maxBatchOpen != null ? maxBatchOpen.hashCode() : 0);
262248
result = 31 * result + (visibilityTimeout != null ? visibilityTimeout.hashCode() : 0);
263-
result = 31 * result + (longPollWaitTimeoutSeconds != null ? longPollWaitTimeoutSeconds.hashCode() : 0);
264-
result = 31 * result + (minReceiveWaitTimeMs != null ? minReceiveWaitTimeMs.hashCode() : 0);
249+
result = 31 * result + (longPollWaitTimeout != null ? longPollWaitTimeout.hashCode() : 0);
250+
result = 31 * result + (minReceiveWaitTime != null ? minReceiveWaitTime.hashCode() : 0);
265251
result = 31 * result + (receiveAttributeNames != null ? receiveAttributeNames.hashCode() : 0);
266252
result = 31 * result + (receiveMessageAttributeNames != null ? receiveMessageAttributeNames.hashCode() : 0);
267253
result = 31 * result + (adaptivePrefetching != null ? adaptivePrefetching.hashCode() : 0);
268-
result = 31 * result + (longPoll != null ? longPoll.hashCode() : 0);
269254
result = 31 * result + (maxInflightReceiveBatches != null ? maxInflightReceiveBatches.hashCode() : 0);
270255
result = 31 * result + (maxDoneReceiveBatches != null ? maxDoneReceiveBatches.hashCode() : 0);
271256
return result;
@@ -276,16 +261,15 @@ public static final class Builder implements CopyableBuilder<Builder, BatchOverr
276261
private Integer maxBatchItems;
277262
private Integer maxBatchKeys;
278263
private Integer maxBufferSize;
279-
private Duration maxBatchOpenInMs;
264+
private Duration maxBatchOpen;
280265
private Duration visibilityTimeout;
281-
private Integer longPollWaitTimeoutSeconds;
282-
private Duration minReceiveWaitTimeMs;
266+
private Duration longPollWaitTimeout;
267+
private Duration minReceiveWaitTime;
283268
private Integer maxDoneReceiveBatches;
284269
private Integer maxInflightReceiveBatches;
285270
private List<String> receiveAttributeNames = Collections.emptyList();
286271
private List<String> receiveMessageAttributeNames = Collections.emptyList();
287272
private Boolean adaptivePrefetching;
288-
private Boolean longPoll;
289273

290274
private Builder() {
291275
}
@@ -331,11 +315,11 @@ public Builder maxBufferSize(Integer maxBufferSize) {
331315
* Define the maximum amount of time (in milliseconds) that an outgoing call waits for other requests before sending out a
332316
* batch request.
333317
*
334-
* @param maxBatchOpenInMs The new maxBatchOpenInMs value.
318+
* @param maxBatchOpen The new maxBatchOpen value.
335319
* @return This object for method chaining.
336320
*/
337-
public Builder maxBatchOpenInMs(Duration maxBatchOpenInMs) {
338-
this.maxBatchOpenInMs = maxBatchOpenInMs;
321+
public Builder maxBatchOpen(Duration maxBatchOpen) {
322+
this.maxBatchOpen = maxBatchOpen;
339323
return this;
340324
}
341325

@@ -356,11 +340,11 @@ public Builder visibilityTimeout(Duration visibilityTimeout) {
356340
* Define the amount of time, in seconds, the receive call will block on the server waiting for messages to arrive if the
357341
* queue is empty when the receive call is first made. This setting has no effect if long polling is disabled.
358342
*
359-
* @param longPollWaitTimeoutSeconds The new longPollWaitTimeoutSeconds value.
343+
* @param longPollWaitTimeout The new longPollWaitTimeout value.
360344
* @return This object for method chaining.
361345
*/
362-
public Builder longPollWaitTimeoutSeconds(Integer longPollWaitTimeoutSeconds) {
363-
this.longPollWaitTimeoutSeconds = longPollWaitTimeoutSeconds;
346+
public Builder longPollWaitTimeout(Duration longPollWaitTimeout) {
347+
this.longPollWaitTimeout = longPollWaitTimeout;
364348
return this;
365349
}
366350

@@ -369,11 +353,11 @@ public Builder longPollWaitTimeoutSeconds(Integer longPollWaitTimeoutSeconds) {
369353
* easily waste CPU time busy-waiting against empty local buffers. Avoid setting this to 0 unless you are confident
370354
* threads will do useful work in-between each call to receive messages!
371355
*
372-
* @param minReceiveWaitTimeMs The new minReceiveWaitTimeMs value.
356+
* @param minReceiveWaitTime The new minReceiveWaitTime value.
373357
* @return This object for method chaining.
374358
*/
375-
public Builder minReceiveWaitTimeMs(Duration minReceiveWaitTimeMs) {
376-
this.minReceiveWaitTimeMs = minReceiveWaitTimeMs;
359+
public Builder minReceiveWaitTime(Duration minReceiveWaitTime) {
360+
this.minReceiveWaitTime = minReceiveWaitTime;
377361
return this;
378362
}
379363

@@ -445,17 +429,6 @@ public Builder adaptivePrefetching(Boolean adaptivePrefetching) {
445429
return this;
446430
}
447431

448-
/**
449-
* Define the option for long polling. Specify "true" for receive requests to use long polling.
450-
*
451-
* @param longPoll The new longPoll value.
452-
* @return This object for method chaining.
453-
*/
454-
public Builder longPoll(Boolean longPoll) {
455-
this.longPoll = longPoll;
456-
return this;
457-
}
458-
459432
public BatchOverrideConfiguration build() {
460433
return new BatchOverrideConfiguration(this);
461434
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/AsyncReceiveMessageBatch.java

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,10 @@
1717

1818

1919
import java.time.Duration;
20-
import java.time.temporal.ChronoUnit;
21-
import java.util.ArrayList;
2220
import java.util.List;
2321
import java.util.Optional;
2422
import java.util.concurrent.CompletableFuture;
25-
import java.util.concurrent.ScheduledExecutorService;
26-
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.concurrent.CopyOnWriteArrayList;
2724
import java.util.stream.Collectors;
2825
import java.util.stream.IntStream;
2926
import software.amazon.awssdk.annotations.SdkInternalApi;
@@ -34,6 +31,7 @@
3431
import software.amazon.awssdk.services.sqs.model.Message;
3532
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
3633
import software.amazon.awssdk.utils.Logger;
34+
import software.amazon.awssdk.utils.NumericUtils;
3735

3836
/**
3937
* The {@code AsyncReceiveMessageBatch} class forms a {@link ReceiveMessageRequest} request based on configuration settings,
@@ -49,20 +47,18 @@
4947
public class AsyncReceiveMessageBatch {
5048

5149
private static final Logger log = Logger.loggerFor(AsyncReceiveMessageBatch.class);
52-
53-
private final ScheduledExecutorService scheduledExecutorService;
5450
private final String queueUrl;
5551
private final SqsAsyncClient asyncClient;
5652
private final Duration visibilityTimeout;
5753
private final ResponseBatchConfiguration config;
58-
private final AtomicBoolean open = new AtomicBoolean(false);
5954
private volatile Throwable exception;
60-
private List<Message> messages;
55+
private volatile List<Message> messages = new CopyOnWriteArrayList<>();
6156
private long visibilityDeadlineNano;
6257

63-
public AsyncReceiveMessageBatch(ScheduledExecutorService scheduledExecutorService, String queueUrl,
64-
SqsAsyncClient asyncClient, Duration visibilityTimeout, ResponseBatchConfiguration config) {
65-
this.scheduledExecutorService = scheduledExecutorService;
58+
public AsyncReceiveMessageBatch(String queueUrl,
59+
SqsAsyncClient asyncClient,
60+
Duration visibilityTimeout,
61+
ResponseBatchConfiguration config) {
6662
this.queueUrl = queueUrl;
6763
this.asyncClient = asyncClient;
6864
this.visibilityTimeout = visibilityTimeout;
@@ -77,20 +73,19 @@ public CompletableFuture<AsyncReceiveMessageBatch> asyncReceiveMessage() {
7773
.messageAttributeNames(config.receiveMessageAttributeNames())
7874
.messageAttributeNames(config.receiveMessageAttributeNames());
7975

80-
request.visibilityTimeout((int) this.visibilityTimeout.get(ChronoUnit.SECONDS));
76+
request.visibilityTimeout(NumericUtils.saturatedCast(this.visibilityTimeout.getSeconds()));
8177

82-
if (config.longPoll()) {
83-
request.waitTimeSeconds(config.longPollWaitTimeoutSeconds());
78+
if (config.longPollWaitTimeout() != null) {
79+
request.waitTimeSeconds(NumericUtils.saturatedCast(config.longPollWaitTimeout().getSeconds()));
8480
}
8581
try {
8682
return asyncClient.receiveMessage(request.build())
8783
.handle((response, throwable) -> {
8884
if (throwable != null) {
8985
setException(throwable);
9086
} else {
91-
messages = new ArrayList<>(response.messages());
87+
messages = new CopyOnWriteArrayList<>(response.messages());
9288
}
93-
open.set(true);
9489
return this;
9590
});
9691
} finally {
@@ -104,7 +99,6 @@ public boolean isEmpty() {
10499
}
105100

106101
public Throwable getException() {
107-
checkIfOpen();
108102
return exception;
109103
}
110104

@@ -113,7 +107,6 @@ public void setException(Throwable exception) {
113107
}
114108

115109
public Message removeMessage() {
116-
checkIfOpen();
117110
if (isExpired()) {
118111
clear();
119112
return null;
@@ -161,12 +154,6 @@ private CompletableFuture<ChangeMessageVisibilityBatchResponse> nackMessages() {
161154
return asyncClient.changeMessageVisibilityBatch(batchRequest);
162155
}
163156

164-
private void checkIfOpen() {
165-
if (!open.get()) {
166-
throw new IllegalStateException("Batch is not open");
167-
}
168-
}
169-
170157
public Integer messagesSize() {
171158
return messages != null ? messages.size() : 0;
172159
}

0 commit comments

Comments
 (0)