Skip to content

Commit 1a0b987

Browse files
committed
SdkClosable implemented
1 parent f510b82 commit 1a0b987

File tree

5 files changed

+16
-20
lines changed

5 files changed

+16
-20
lines changed

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/QueueAttributesManager.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,22 +113,19 @@ private CompletableFuture<Map<QueueAttributeName, String>> getAttributeMap() {
113113
CompletableFuture<Map<QueueAttributeName, String>> newFuture = new CompletableFuture<>();
114114

115115
if (queueAttributeMap.compareAndSet(future, newFuture)) {
116-
// Only one thread will execute this block and fetch the attributes.
117116
fetchQueueAttributes().whenComplete((r, t) -> {
118117
if (t != null) {
119-
newFuture.completeExceptionally(t); // Complete the future exceptionally
118+
newFuture.completeExceptionally(t);
120119
} else {
121-
newFuture.complete(r); // Complete the future with the result
120+
newFuture.complete(r);
122121
}
123122
});
124123
return newFuture;
125124
} else {
126-
// If another thread already set the future, cancel this one and use the existing one.
127125
newFuture.cancel(true);
128-
return queueAttributeMap.get(); // Return the future set by the winning thread
126+
return queueAttributeMap.get();
129127
}
130128
}
131-
132129
return future;
133130
}
134131

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveBatchManager.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@
2424
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2525
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
2626
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
27+
import software.amazon.awssdk.utils.SdkAutoCloseable;
2728

2829
@SdkInternalApi
29-
public class ReceiveBatchManager {
30+
public class ReceiveBatchManager implements SdkAutoCloseable {
3031

3132
private final SqsAsyncClient sqsClient;
3233
private final ScheduledExecutorService executor;
@@ -62,8 +63,8 @@ public CompletableFuture<ReceiveMessageResponse> processRequest(ReceiveMessageRe
6263
});
6364
}
6465

65-
public void shutdown() {
66-
receiveQueueBuffer.shutdown();
67-
executor.shutdown();
66+
@Override
67+
public void close() {
68+
receiveQueueBuffer.close();
6869
}
6970
}

services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/ReceiveQueueBuffer.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,10 @@
2828
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2929
import software.amazon.awssdk.services.sqs.model.Message;
3030
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
31+
import software.amazon.awssdk.utils.SdkAutoCloseable;
3132

3233
@SdkInternalApi
33-
public class ReceiveQueueBuffer {
34+
public class ReceiveQueueBuffer implements SdkAutoCloseable {
3435

3536
private final ScheduledExecutorService executor;
3637
private final SqsAsyncClient sqsClient;
@@ -66,9 +67,9 @@ public boolean isShutDown() {
6667
return shutDown.get();
6768
}
6869

69-
public void shutdown() {
70+
@Override
71+
public void close() {
7072
if (this.shutDown.compareAndSet(false, true)) {
71-
// Clear all finished tasks
7273
while (!finishedTasks.isEmpty()) {
7374
ReceiveSqsMessageHelper batch = finishedTasks.poll();
7475
if (inflightReceiveMessageBatches.get() > 0) {
@@ -78,8 +79,6 @@ public void shutdown() {
7879
batch.clear();
7980
}
8081
}
81-
82-
// Clear futures
8382
futures.forEach(futureWrapper -> {
8483
if (!futureWrapper.getFuture().isDone()) {
8584
futureWrapper.getFuture().completeExceptionally(new CancellationException("Shutdown in progress"));
@@ -190,7 +189,6 @@ private void reportBatchFinished(ReceiveSqsMessageHelper batch) {
190189
spawnMoreReceiveTasks();
191190
}
192191

193-
194192
private static class FutureRequestWrapper {
195193
private final CompletableFuture<ReceiveMessageResponse> future;
196194
private final int requestedSize;

services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveBatchManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public void testShutdown() throws Exception {
165165
ReceiveMessageRequest request = ReceiveMessageRequest.builder().maxNumberOfMessages(10).build();
166166
CompletableFuture<ReceiveMessageResponse> futureResponse = receiveBatchManager.processRequest(request);
167167

168-
receiveBatchManager.shutdown();
168+
receiveBatchManager.close();
169169

170170
assertThrows(IllegalStateException.class, () -> receiveBatchManager.processRequest(request));
171171

services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/ReceiveQueueBufferTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ public void receiveMessageShutDown() throws Exception {
298298
receiveQueueBuffer.receiveMessage(future, 10);
299299

300300
// Shutdown receiveQueueBuffer
301-
receiveQueueBuffer.shutdown();
301+
receiveQueueBuffer.close();
302302

303303
// Verify that the future is completed exceptionally
304304
assertTrue(future.isCompletedExceptionally());
@@ -441,7 +441,7 @@ public void testShutdownExceptionallyCompletesAllIncompleteFutures() throws Exce
441441
}
442442

443443
// Shutdown the queue buffer and assert no exceptions are thrown
444-
assertDoesNotThrow(() -> receiveQueueBuffer.shutdown());
444+
assertDoesNotThrow(() -> receiveQueueBuffer.close());
445445

446446
// Verify that each future completes exceptionally with CancellationException
447447
for (CompletableFuture<ReceiveMessageResponse> future : futures) {
@@ -477,7 +477,7 @@ public void visibilityTimeOutErrorsAreLogged() throws Exception {
477477

478478
try (LogCaptor logCaptor = LogCaptor.create(Level.DEBUG)) {
479479
// Shutdown the receiveQueueBuffer to trigger the visibility timeout errors
480-
assertDoesNotThrow(() -> receiveQueueBuffer.shutdown());
480+
assertDoesNotThrow(receiveQueueBuffer::close);
481481

482482
// Verify that an error was logged for failing to change visibility timeout
483483
assertThat(logCaptor.loggedEvents()).anySatisfy(logEvent -> {

0 commit comments

Comments
 (0)