Skip to content

Add support for MessageProducer.setDeliveryDelay() (from JMS 2.0) #76

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/main/java/com/amazon/sqs/javamessaging/SQSMessageProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import javax.jms.Destination;
Expand Down Expand Up @@ -55,6 +56,10 @@
public class SQSMessageProducer implements MessageProducer, QueueSender {
private static final Log LOG = LogFactory.getLog(SQSMessageProducer.class);

private long MAXIMUM_DELIVERY_DELAY_MILLISECONDS = TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);

private int deliveryDelaySeconds = 0;

/** This field is not actually used. */
private long timeToLive;
/** This field is not actually used. */
Expand Down Expand Up @@ -122,6 +127,10 @@ void sendInternal(SQSQueueDestination queue, Message rawMessage) throws JMSExcep
SendMessageRequest sendMessageRequest = new SendMessageRequest(queue.getQueueUrl(), sqsMessageBody);
sendMessageRequest.setMessageAttributes(messageAttributes);

if (deliveryDelaySeconds != 0) {
sendMessageRequest.setDelaySeconds(deliveryDelaySeconds);
}

//for FIFO queues, we have to specify both MessageGroupId, which we obtain from standard property JMSX_GROUP_ID
//and MessageDeduplicationId, which we obtain from a custom provider specific property JMS_SQS_DEDUPLICATION_ID
//notice that this code does not validate if the values are actually set by the JMS user
Expand Down Expand Up @@ -493,7 +502,32 @@ public void setTimeToLive(long timeToLive) throws JMSException {
public long getTimeToLive() throws JMSException {
return timeToLive;
}

/**
* Sets the minimum length of time in milliseconds that must elapse after a
* message is sent before the JMS provider may deliver the message to a consumer.
* <p>
* This must be a multiple of 1000, since SQS only supports delivery delays
* in seconds.
*/
public void setDeliveryDelay(long deliveryDelay) {
if (deliveryDelay < 0 || deliveryDelay > MAXIMUM_DELIVERY_DELAY_MILLISECONDS) {
throw new IllegalArgumentException("Delivery delay must be non-negative and at most 15 minutes: " + deliveryDelay);
}
if (deliveryDelay % 1000 != 0) {
throw new IllegalArgumentException("Delivery delay must be a multiple of 1000: " + deliveryDelay);
}
this.deliveryDelaySeconds = (int)(deliveryDelay / 1000);
}

/**
* Gets the minimum length of time in milliseconds that must elapse after a
* message is sent before the JMS provider may deliver the message to a consumer.
*/
public long getDeliveryDelay() {
return deliveryDelaySeconds * 1000;
}

void checkClosed() throws IllegalStateException {
if (closed.get()) {
throw new IllegalStateException("The producer is closed.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -699,6 +701,50 @@ public void testClose() throws JMSException {
verify(sqsSession).removeProducer(producer);
}

@Test
public void testSetDeliveryDelay() throws JMSException {
assertEquals(0, producer.getDeliveryDelay());

producer.setDeliveryDelay(2000);

assertEquals(2000, producer.getDeliveryDelay());

ArgumentCaptor<SendMessageRequest> requestCaptor = ArgumentCaptor.forClass(SendMessageRequest.class);
when(amazonSQSClient.sendMessage(requestCaptor.capture()))
.thenReturn(new SendMessageResult().withMessageId(MESSAGE_ID_1));

SQSTextMessage msg = new SQSTextMessage("Sorry I'm late!");
producer.send(msg);

assertEquals(2, requestCaptor.getValue().getDelaySeconds().intValue());
}


@Test
public void testSetDeliveryDelayInvalidDelays() throws JMSException {
try {
producer.setDeliveryDelay(-1);
fail();
} catch (IllegalArgumentException ide) {
// expected
}

try {
producer.setDeliveryDelay(TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS));
fail();
} catch (IllegalArgumentException ide) {
// expected
}

try {
producer.setDeliveryDelay(20);
fail();
} catch (IllegalArgumentException ide) {
// expected
}
}


private Map<String, MessageAttributeValue> createMessageAttribute(String type) {
MessageAttributeValue messageAttributeValue = new MessageAttributeValue();
messageAttributeValue.setDataType("String");
Expand Down