Skip to content

Receive Batch Manager Implementation #5488

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
import software.amazon.awssdk.services.sqs.batchmanager.SqsAsyncBatchManager;
Expand All @@ -43,6 +42,8 @@ public final class DefaultSqsAsyncBatchManager implements SqsAsyncBatchManager {

private final ChangeMessageVisibilityBatchManager changeMessageVisibilityBatchManager;

private final ReceiveMessageBatchManager receiveMessageBatchManager;

private DefaultSqsAsyncBatchManager(DefaultBuilder builder) {
this.client = Validate.notNull(builder.client, "client cannot be null");

Expand All @@ -57,19 +58,9 @@ private DefaultSqsAsyncBatchManager(DefaultBuilder builder) {
this.changeMessageVisibilityBatchManager = new ChangeMessageVisibilityBatchManager(builder.overrideConfiguration,
scheduledExecutor,
client);
//TODO : this will be updated while implementing the Receive Message Batch Manager
}

@SdkTestInternalApi
public DefaultSqsAsyncBatchManager(
SqsAsyncClient client,
SendMessageBatchManager sendMessageBatchManager,
DeleteMessageBatchManager deleteMessageBatchManager,
ChangeMessageVisibilityBatchManager changeMessageVisibilityBatchManager) {
this.sendMessageBatchManager = sendMessageBatchManager;
this.deleteMessageBatchManager = deleteMessageBatchManager;
this.changeMessageVisibilityBatchManager = changeMessageVisibilityBatchManager;
this.client = client;
this.receiveMessageBatchManager = new ReceiveMessageBatchManager(client, scheduledExecutor,
builder.overrideConfiguration);
}

@Override
Expand All @@ -89,7 +80,7 @@ public CompletableFuture<ChangeMessageVisibilityResponse> changeMessageVisibilit

@Override
public CompletableFuture<ReceiveMessageResponse> receiveMessage(ReceiveMessageRequest request) {
return null;
return this.receiveMessageBatchManager.batchRequest(request);
}

public static SqsAsyncBatchManager.Builder builder() {
Expand All @@ -101,6 +92,7 @@ public void close() {
sendMessageBatchManager.close();
deleteMessageBatchManager.close();
changeMessageVisibilityBatchManager.close();
receiveMessageBatchManager.close();
}

public static final class DefaultBuilder implements SqsAsyncBatchManager.Builder {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.utils.CompletableFutureUtils;
import software.amazon.awssdk.utils.Validate;


/**
* The {@code QueueAttributesManager} class is responsible for managing and retrieving specific attributes
* of an AWS SQS queue, such as message wait time and visibility timeout. It efficiently caches these attributes
* to minimize redundant API calls to SQS, ensuring that the attributes are fetched only once and reused in subsequent requests.
*
* <p>This class uses an {@link AtomicReference} to maintain the state of the attribute map, allowing concurrent access
* and handling cases where the fetching of attributes may fail. If an error occurs during the retrieval of attributes,
* the state is reset to allow for a fresh attempt in subsequent calls.</p>
*
* <p>The class provides methods to get the visibility timeout and calculate the message receive timeout, which
* are asynchronously retrieved and processed using {@link CompletableFuture}. These methods handle cancellation
* scenarios by cancelling the SQS request if the calling future is cancelled.</p>
*
* <p>This class is intended for internal use and is marked with the {@link SdkInternalApi} annotation.</p>
*/
@SdkInternalApi
public final class QueueAttributesManager {

private static final List<QueueAttributeName> QUEUE_ATTRIBUTE_NAMES =
Arrays.asList(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS,
QueueAttributeName.VISIBILITY_TIMEOUT);
private final SqsAsyncClient sqsClient;
private final String queueUrl;
private final AtomicReference<CompletableFuture<Map<QueueAttributeName, String>>> queueAttributeMap = new AtomicReference<>();

public QueueAttributesManager(SqsAsyncClient sqsClient, String queueUrl) {
this.sqsClient = sqsClient;
this.queueUrl = queueUrl;
}

/**
* Retrieves the received message timeout based on the provided request and queue attributes.
*
* @param rq The receive message request
* @param configuredWaitTime The configured minimum wait time
* @return CompletableFuture with the calculated receive message timeout in milliseconds
*/
public CompletableFuture<Duration> getReceiveMessageTimeout(ReceiveMessageRequest rq, Duration configuredWaitTime) {
Integer waitTimeSeconds = rq.waitTimeSeconds();
if (waitTimeSeconds != null) {
long waitTimeMillis = TimeUnit.SECONDS.toMillis(waitTimeSeconds);
return CompletableFuture.completedFuture(Duration.ofMillis(Math.max(configuredWaitTime.toMillis(), waitTimeMillis)));
}

CompletableFuture<Map<QueueAttributeName, String>> attributeFuture = getAttributeMap();
CompletableFuture<Duration> resultFuture = attributeFuture.thenApply(attributes -> {
String waitTimeSecondsStr = attributes.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS);
long waitTimeFromSqsMillis = TimeUnit.SECONDS.toMillis(Long.parseLong(waitTimeSecondsStr));
return Duration.ofMillis(Math.max(configuredWaitTime.toMillis(), waitTimeFromSqsMillis));
});

return CompletableFutureUtils.forwardExceptionTo(resultFuture, attributeFuture);
}

/**
* Retrieves the visibility timeout for the queue.
*
* @return CompletableFuture with the visibility timeout in nanoseconds
*/
public CompletableFuture<Duration> getVisibilityTimeout() {
CompletableFuture<Map<QueueAttributeName, String>> attributeFuture = getAttributeMap();
CompletableFuture<Duration> resultFuture = attributeFuture.thenApply(attributes -> {
String visibilityTimeoutStr = attributes.get(QueueAttributeName.VISIBILITY_TIMEOUT);
return Duration.ofSeconds(Integer.parseInt(visibilityTimeoutStr));
});

return CompletableFutureUtils.forwardExceptionTo(resultFuture, attributeFuture);
}

/**
* Retrieves the queue attributes based on the predefined attribute names.
*
* @return CompletableFuture with the map of attribute names and their values.
*/
private CompletableFuture<Map<QueueAttributeName, String>> getAttributeMap() {
CompletableFuture<Map<QueueAttributeName, String>> future = queueAttributeMap.get();

if (future == null || future.isCompletedExceptionally()) {
CompletableFuture<Map<QueueAttributeName, String>> newFuture = new CompletableFuture<>();

if (queueAttributeMap.compareAndSet(future, newFuture)) {
fetchQueueAttributes().whenComplete((r, t) -> {
if (t != null) {
newFuture.completeExceptionally(t);
} else {
newFuture.complete(r);
}
});
return newFuture;
} else {
newFuture.cancel(true);
return queueAttributeMap.get();
}
}
return future;
}

/**
* Fetches the queue attributes from SQS and completes the provided future with the result.
*
* @return CompletableFuture with the map of attribute names and values.
*/
private CompletableFuture<Map<QueueAttributeName, String>> fetchQueueAttributes() {
GetQueueAttributesRequest request = GetQueueAttributesRequest.builder()
.queueUrl(queueUrl)
.attributeNames(QUEUE_ATTRIBUTE_NAMES)
.build();

CompletableFuture<Map<QueueAttributeName, String>> future =
sqsClient.getQueueAttributes(request)
.thenApply(response -> {
Map<QueueAttributeName, String> attributes = response.attributes();
Validate.notNull(attributes.get(QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS),
QueueAttributeName.RECEIVE_MESSAGE_WAIT_TIME_SECONDS
+ " attribute is null in SQS.");
Validate.notNull(attributes.get(QueueAttributeName.VISIBILITY_TIMEOUT),
QueueAttributeName.VISIBILITY_TIMEOUT + " attribute is null in SQS.");
return attributes.entrySet().stream()
.filter(entry -> QUEUE_ATTRIBUTE_NAMES.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
});

return future;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import static software.amazon.awssdk.services.sqs.internal.batchmanager.SqsMessageDefault.MAX_SUPPORTED_SQS_RECEIVE_MSG;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.utils.SdkAutoCloseable;

@SdkInternalApi
public class ReceiveBatchManager implements SdkAutoCloseable {

private final SqsAsyncClient sqsClient;
private final ScheduledExecutorService executor;
private final ResponseBatchConfiguration config;
private final String queueUrl;
private final ReceiveQueueBuffer receiveQueueBuffer;
private final QueueAttributesManager queueAttributesManager;

public ReceiveBatchManager(SqsAsyncClient sqsClient, ScheduledExecutorService executor, ResponseBatchConfiguration config,
String queueUrl) {
this.sqsClient = sqsClient;
this.executor = executor;
this.config = config;
this.queueUrl = queueUrl;
this.queueAttributesManager = new QueueAttributesManager(sqsClient, queueUrl);
this.receiveQueueBuffer = ReceiveQueueBuffer.builder()
.executor(executor)
.sqsClient(sqsClient)
.config(config)
.queueUrl(queueUrl)
.queueAttributesManager(queueAttributesManager).build();
}

public CompletableFuture<ReceiveMessageResponse> processRequest(ReceiveMessageRequest rq) {
if (receiveQueueBuffer.isShutDown()) {
throw new IllegalStateException("The client has been shut down.");
}
int numMessages = rq.maxNumberOfMessages() != null ? rq.maxNumberOfMessages() : MAX_SUPPORTED_SQS_RECEIVE_MSG;

return queueAttributesManager.getReceiveMessageTimeout(rq, config.minReceiveWaitTime()).thenCompose(waitTimeMs -> {
CompletableFuture<ReceiveMessageResponse> receiveMessageFuture = new CompletableFuture<>();
receiveQueueBuffer.receiveMessage(receiveMessageFuture, numMessages);
CompletableFuture<ReceiveMessageResponse> timeoutFuture = new CompletableFuture<>();
executor.schedule(() -> timeoutFuture.complete(ReceiveMessageResponse.builder().build()), waitTimeMs.toMillis(),
TimeUnit.MILLISECONDS);
return receiveMessageFuture.applyToEither(timeoutFuture, Function.identity());

});
}

@Override
public void close() {
receiveQueueBuffer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.sqs.internal.batchmanager;

import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.batchmanager.BatchOverrideConfiguration;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.utils.Logger;
import software.amazon.awssdk.utils.SdkAutoCloseable;

@SdkInternalApi
public class ReceiveMessageBatchManager implements SdkAutoCloseable {

private static final Logger log = Logger.loggerFor(ReceiveMessageBatchManager.class);

private final SqsAsyncClient sqsClient;
private final ScheduledExecutorService executor;
private final ResponseBatchConfiguration config;
private final Map<String, ReceiveBatchManager> receiveBatchManagerMap = new ConcurrentHashMap<>();

public ReceiveMessageBatchManager(SqsAsyncClient sqsClient,
ScheduledExecutorService executor,
BatchOverrideConfiguration config) {
this.sqsClient = sqsClient;
this.executor = executor;
this.config = new ResponseBatchConfiguration(config);
}

public CompletableFuture<ReceiveMessageResponse> batchRequest(ReceiveMessageRequest request) {
if (canBeRetrievedFromQueueBuffer(request)) {
return receiveBatchManagerMap.computeIfAbsent(generateBatchKey(request), key -> createReceiveBatchManager(request))
.processRequest(request);
} else {
log.debug(() -> "canBeRetrievedFromQueueBuffer failed, so skipping batching for request for Queue with URL: "
+ request.queueUrl());
return sqsClient.receiveMessage(request);
}
}

/**
* Generates a unique key for batch processing based on the queue URL and any override configuration.
*
* @param request The receive message request.
* @return The generated batch key.
*/
private String generateBatchKey(ReceiveMessageRequest request) {
return request.overrideConfiguration()
.map(config -> request.queueUrl() + config.hashCode())
.orElse(request.queueUrl());
}

private ReceiveBatchManager createReceiveBatchManager(ReceiveMessageRequest request) {
return new ReceiveBatchManager(sqsClient, executor, config, request.queueUrl());
}

@Override
public void close() {
receiveBatchManagerMap.values().forEach(ReceiveBatchManager::close);
}

private boolean canBeRetrievedFromQueueBuffer(ReceiveMessageRequest rq) {
return hasCompatibleAttributes(rq) && isBufferingEnabled() && rq.visibilityTimeout() == null;
}


private boolean hasCompatibleAttributes(ReceiveMessageRequest rq) {
return !rq.hasAttributeNames()
&& hasCompatibleSystemAttributes(rq)
&& hasCompatibleMessageAttributes(rq);
}

private boolean hasCompatibleSystemAttributes(ReceiveMessageRequest rq) {
return !rq.hasMessageSystemAttributeNames()
|| config.messageSystemAttributeNames().equals(rq.messageSystemAttributeNames());
}

private boolean hasCompatibleMessageAttributes(ReceiveMessageRequest rq) {
return !rq.hasMessageAttributeNames()
|| config.receiveMessageAttributeNames().equals(rq.messageAttributeNames());
}

private boolean isBufferingEnabled() {
return config.maxInflightReceiveBatches() > 0 && config.maxDoneReceiveBatches() > 0;
}

}
Loading
Loading