Skip to content

receiveNoWait() without prefetching #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
*/
package com.amazon.sqs.javamessaging;

import java.net.URI;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -218,7 +218,7 @@ public void run() {
}

if (!isClosed()) {
messages = getMessages(prefetchBatchSize);
messages = getMessagesWithBackoff(prefetchBatchSize);
}

if (messages != null && !messages.isEmpty()) {
Expand All @@ -240,38 +240,24 @@ public void run() {
}

/**
* Call <code>receiveMessage</code> with long-poll wait time of 20 seconds
* with available prefetch batch size and potential re-tries.
* Call <code>receiveMessage</code> with the given wait time.
*/
protected List<Message> getMessages(int prefetchBatchSize) throws InterruptedException {
protected List<Message> getMessages(int batchSize, int waitTimeSeconds) throws JMSException {

assert prefetchBatchSize > 0;
assert batchSize > 0;

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl)
.withMaxNumberOfMessages(prefetchBatchSize)
.withMaxNumberOfMessages(batchSize)
.withAttributeNames(ALL)
.withMessageAttributeNames(ALL)
.withWaitTimeSeconds(WAIT_TIME_SECONDS);
.withWaitTimeSeconds(waitTimeSeconds);
//if the receive request is for FIFO queue, provide a unique receive request attempt it, so that
//failed calls retried by SDK will claim the same messages
if (sqsDestination.isFifo()) {
receiveMessageRequest.withReceiveRequestAttemptId(UUID.randomUUID().toString());
}
List<Message> messages = null;
try {
ReceiveMessageResult receivedMessageResult = amazonSQSClient.receiveMessage(receiveMessageRequest);
messages = receivedMessageResult.getMessages();
retriesAttempted = 0;
} catch (JMSException e) {
LOG.warn("Encountered exception during receive in ConsumerPrefetch thread", e);
try {
sleep(backoffStrategy.delayBeforeNextRetry(retriesAttempted++));
} catch (InterruptedException ex) {
LOG.warn("Interrupted while retrying on receive", ex);
throw ex;
}
}
return messages;
ReceiveMessageResult receivedMessageResult = amazonSQSClient.receiveMessage(receiveMessageRequest);
return receivedMessageResult.getMessages();
}

/**
Expand All @@ -288,6 +274,7 @@ protected void processReceivedMessages(List<Message> messages) {
javax.jms.Message jmsMessage = convertToJMSMessage(message);
messageManagers.add(new MessageManager(this, jmsMessage));
} catch (JMSException e) {
LOG.warn("Caught exception while converting received messages", e);
nackMessages.add(message.getReceiptHandle());
}
}
Expand All @@ -311,6 +298,23 @@ protected void processReceivedMessages(List<Message> messages) {
}
}

protected List<Message> getMessagesWithBackoff(int batchSize) throws InterruptedException {
try {
List<Message> result = getMessages(batchSize, WAIT_TIME_SECONDS);
retriesAttempted = 0;
return result;
} catch (JMSException e) {
LOG.warn("Encountered exception during receive in ConsumerPrefetch thread", e);
try {
sleep(backoffStrategy.delayBeforeNextRetry(retriesAttempted++));
return Collections.emptyList();
} catch (InterruptedException ex) {
LOG.warn("Interrupted while retrying on receive", ex);
throw ex;
}
}
}

protected void waitForPrefetch() throws InterruptedException {
synchronized (stateLock) {
while (numberOfMessagesToFetch() <= 0 && !isClosed()) {
Expand Down Expand Up @@ -524,6 +528,12 @@ javax.jms.Message receiveNoWait() throws JMSException {

MessageManager messageManager;
synchronized (stateLock) {
if (messageQueue.isEmpty() && numberOfMessagesToPrefetch == 0) {
List<Message> messages = getMessages(1, 0);
if (messages != null && !messages.isEmpty()) {
processReceivedMessages(messages);
}
}
messageManager = messageQueue.pollFirst();
}
if (messageManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1192,9 +1192,7 @@ private static String getType(Object value) throws JMSException {
}

private static Object getObjectValue(String value, String type) throws JMSException {
if (STRING.equals(type) || NUMBER.equals(type)) {
return value;
} else if (INT.equals(type)) {
if (INT.equals(type)) {
return Integer.valueOf(value);
} else if (LONG.equals(type)) {
return Long.valueOf(value);
Expand All @@ -1211,6 +1209,8 @@ private static Object getObjectValue(String value, String type) throws JMSExcept
return Float.valueOf(value);
} else if (SHORT.equals(type)) {
return Short.valueOf(value);
} else if (type != null && (type.startsWith(STRING) || type.startsWith(NUMBER))) {
return value;
} else {
throw new JMSException(type + " is not a supported JMS property type");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ public void testStopWhenConsumerClosedDuringWaitForPrefetch() throws Interrupted
verify(consumerPrefetch, times(2)).nackQueueMessages();

// Ensure we do not get messages when closed while waiting for prefetch
verify(consumerPrefetch, never()).getMessages(anyInt());
verify(consumerPrefetch, never()).getMessages(anyInt(), anyInt());

// Ensure we do not process any messages
verify(consumerPrefetch, never()).processReceivedMessages(any(List.class));
Expand Down Expand Up @@ -296,7 +296,7 @@ public void testStopAfterInterruptWaitForStart() throws InterruptedException, JM
verify(consumerPrefetch).nackQueueMessages();

verify(consumerPrefetch, never()).waitForPrefetch();
verify(consumerPrefetch, never()).getMessages(anyInt());
verify(consumerPrefetch, never()).getMessages(anyInt(), anyInt());
verify(consumerPrefetch, never()).processReceivedMessages(any(List.class));

// Ensure retries attempt was not increased
Expand Down Expand Up @@ -335,7 +335,7 @@ public void testStopAfterErrorWaitForStart() throws InterruptedException, JMSExc
verify(consumerPrefetch).nackQueueMessages();

verify(consumerPrefetch, never()).waitForPrefetch();
verify(consumerPrefetch, never()).getMessages(anyInt());
verify(consumerPrefetch, never()).getMessages(anyInt(), anyInt());
verify(consumerPrefetch, never()).processReceivedMessages(any(List.class));

// Ensure retries attempt was not increased
Expand Down Expand Up @@ -371,7 +371,7 @@ public void testStopAfterInterruptWaitForPrefetch() throws InterruptedException,
verify(consumerPrefetch).waitForPrefetch();
verify(consumerPrefetch).nackQueueMessages();

verify(consumerPrefetch, never()).getMessages(anyInt());
verify(consumerPrefetch, never()).getMessages(anyInt(), anyInt());
verify(consumerPrefetch, never()).processReceivedMessages(any(List.class));

// Ensure retries attempt was not increased
Expand Down Expand Up @@ -412,7 +412,7 @@ public void testStopAfterErrorWaitForPrefetch() throws InterruptedException, JMS
verify(consumerPrefetch).waitForPrefetch();
verify(consumerPrefetch).nackQueueMessages();

verify(consumerPrefetch, never()).getMessages(anyInt());
verify(consumerPrefetch, never()).getMessages(anyInt(), anyInt());
verify(consumerPrefetch, never()).processReceivedMessages(any(List.class));

// Ensure retries attempt was not increased
Expand All @@ -435,7 +435,7 @@ public void testStopAfterInterruptGetMessages() throws InterruptedException, JMS
doNothing()
.when(consumerPrefetch).waitForPrefetch();
doThrow(new InterruptedException("Interrupt"))
.when(consumerPrefetch).getMessages(anyInt());
.when(consumerPrefetch).getMessagesWithBackoff(anyInt());

/*
* Run the prefetch
Expand All @@ -449,7 +449,7 @@ public void testStopAfterInterruptGetMessages() throws InterruptedException, JMS
verify(consumerPrefetch).waitForStart();
verify(consumerPrefetch).waitForPrefetch();
verify(consumerPrefetch).nackQueueMessages();
verify(consumerPrefetch).getMessages(anyInt());
verify(consumerPrefetch).getMessagesWithBackoff(anyInt());

verify(consumerPrefetch, never()).processReceivedMessages(any(List.class));

Expand All @@ -473,7 +473,7 @@ public void testStopAfterErrorGetMessages() throws InterruptedException, JMSExce
doNothing()
.when(consumerPrefetch).waitForPrefetch();
doThrow(new Error("error"))
.when(consumerPrefetch).getMessages(anyInt());
.when(consumerPrefetch).getMessages(anyInt(), anyInt());

/*
* Run the prefetch
Expand All @@ -492,7 +492,7 @@ public void testStopAfterErrorGetMessages() throws InterruptedException, JMSExce
verify(consumerPrefetch).waitForStart();
verify(consumerPrefetch).waitForPrefetch();
verify(consumerPrefetch).nackQueueMessages();
verify(consumerPrefetch).getMessages(anyInt());
verify(consumerPrefetch).getMessages(anyInt(), anyInt());

verify(consumerPrefetch, never()).processReceivedMessages(any(List.class));

Expand Down Expand Up @@ -1332,6 +1332,11 @@ public void testReceiveNoWaitEmpty() throws InterruptedException, JMSException {
*/
consumerPrefetch.running = true;

if (numberOfMessagesToPrefetch == 0) {
when(amazonSQSClient.receiveMessage(any(ReceiveMessageRequest.class)))
.thenReturn(new ReceiveMessageResult());
}

/*
* Call receive messages
*/
Expand Down Expand Up @@ -1542,7 +1547,7 @@ public void testGetMessages() throws InterruptedException, JMSException {
/*
* Get messages
*/
List<com.amazonaws.services.sqs.model.Message> result = consumerPrefetch.getMessages(prefetchBatchSize);
List<com.amazonaws.services.sqs.model.Message> result = consumerPrefetch.getMessagesWithBackoff(prefetchBatchSize);

/*
* Verify results
Expand All @@ -1555,18 +1560,18 @@ public void testGetMessages() throws InterruptedException, JMSException {
* Test Get Messages with illegal prefetch size
*/
@Test
public void testGetMessagesIllegalPrefetchSize() throws InterruptedException {
public void testGetMessagesIllegalPrefetchSize() throws JMSException {

int negativeSize = -10;
try {
consumerPrefetch.getMessages(negativeSize);
consumerPrefetch.getMessages(negativeSize, 0);
fail();
} catch(AssertionError ae) {
// expected exception
}

try {
consumerPrefetch.getMessages(0);
consumerPrefetch.getMessages(0, 0);
fail();
} catch(AssertionError ae) {
// expected exception
Expand Down Expand Up @@ -1597,9 +1602,9 @@ public void testGetMessagesJMSException() throws InterruptedException, JMSExcept
when(backoffStrategy.delayBeforeNextRetry(retriesAttempted + 1))
.thenReturn(secondSleepTime);

consumerPrefetch.getMessages(prefetchBatchSize);
consumerPrefetch.getMessagesWithBackoff(prefetchBatchSize);

consumerPrefetch.getMessages(prefetchBatchSize);
consumerPrefetch.getMessagesWithBackoff(prefetchBatchSize);

/*
* Verify results
Expand Down Expand Up @@ -1638,7 +1643,7 @@ public void testGetMessagesInterruptDuringBackoff() throws InterruptedException,
public void run() {
try {
beforeGetMessagesCall.countDown();
consumerPrefetch.getMessages(prefetchBatchSize);
consumerPrefetch.getMessagesWithBackoff(prefetchBatchSize);
} catch (InterruptedException e) {
recvInterruptedExceptionLatch.countDown();
e.printStackTrace();
Expand Down Expand Up @@ -1672,7 +1677,7 @@ public void testGetMessagesError() throws InterruptedException, JMSException {
.thenThrow(new Error());

try {
consumerPrefetch.getMessages(prefetchBatchSize);
consumerPrefetch.getMessages(prefetchBatchSize, 0);
} catch (Error e) {
// Expected error exception
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class SQSMessageTest {
final String myShort = "myShort";
final String myByte = "myByte";
final String myString = "myString";
final String myCustomString = "myCustomString";
final String myNumber = "myNumber";

@Before
Expand Down Expand Up @@ -326,6 +327,10 @@ public void testSQSMessageAttributeToProperty() throws JMSException {
.withDataType(SQSMessagingClientConstants.STRING)
.withStringValue("StringValue"));

messageAttributes.put(myCustomString, new MessageAttributeValue()
.withDataType(SQSMessagingClientConstants.NUMBER + ".custom")
.withStringValue("['one', 'two']"));

messageAttributes.put(myNumber, new MessageAttributeValue()
.withDataType(SQSMessagingClientConstants.NUMBER)
.withStringValue("500"));
Expand Down Expand Up @@ -374,6 +379,10 @@ public void testSQSMessageAttributeToProperty() throws JMSException {
Assert.assertEquals(message.getObjectProperty(myString), "StringValue");
Assert.assertEquals(message.getStringProperty(myString), "StringValue");

Assert.assertTrue(message.propertyExists(myCustomString));
Assert.assertEquals(message.getObjectProperty(myCustomString), "['one', 'two']");
Assert.assertEquals(message.getStringProperty(myCustomString), "['one', 'two']");

Assert.assertTrue(message.propertyExists(myNumber));
Assert.assertEquals(message.getObjectProperty(myNumber), "500");
Assert.assertEquals(message.getStringProperty(myNumber), "500");
Expand All @@ -395,6 +404,7 @@ public void testSQSMessageAttributeToProperty() throws JMSException {
myShort,
myByte,
myString,
myCustomString,
myNumber,
JMSX_DELIVERY_COUNT));

Expand Down