Skip to content

Commit 8d797c0

Browse files
authored
Merge pull request #199 from rabbitmq/add-durable-flag
Make message durability configurable
2 parents 7f657a4 + 38572f8 commit 8d797c0

File tree

7 files changed

+141
-6
lines changed

7 files changed

+141
-6
lines changed

src/main/java/com/rabbitmq/client/amqp/Message.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,19 @@ public interface Message {
591591
*/
592592
byte[] body();
593593

594+
/**
595+
* Mark the message as durable or not.
596+
*
597+
* <p>Messages are durable by default, use <code>false</code> to make them explicitly non-durable.
598+
*
599+
* <p>Durability depends also on the queue messages end up in (e.g. quorum queues and streams
600+
* always store messages durably).
601+
*
602+
* @param durable true for a durable message, false for a non-durable message
603+
* @return the message
604+
*/
605+
Message durable(boolean durable);
606+
594607
/**
595608
* Whether the message is durable.
596609
*

src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ final class AmqpMessage implements Message {
3535

3636
private final org.apache.qpid.protonj2.client.Message<byte[]> delegate;
3737

38+
private boolean durableIsSet = false;
39+
3840
AmqpMessage() {
3941
this(org.apache.qpid.protonj2.client.Message.create(EMPTY_BODY));
4042
}
@@ -455,6 +457,14 @@ public byte[] body() {
455457
}
456458

457459
// header section
460+
461+
@Override
462+
public Message durable(boolean durable) {
463+
this.durableIsSet = true;
464+
callOnDelegate(m -> m.durable(durable));
465+
return this;
466+
}
467+
458468
@Override
459469
public boolean durable() {
460470
return returnFromDelegate(org.apache.qpid.protonj2.client.Message::durable);
@@ -535,6 +545,13 @@ public MessageAddressBuilder replyToAddress() {
535545
return new DefaultMessageAddressBuilder(this, DefaultMessageAddressBuilder.REPLY_TO_CALLBACK);
536546
}
537547

548+
AmqpMessage enforceDurability() throws ClientException {
549+
if (!this.durableIsSet) {
550+
this.delegate.durable(true);
551+
}
552+
return this;
553+
}
554+
538555
private static class DefaultMessageAddressBuilder
539556
extends DefaultAddressBuilder<MessageAddressBuilder> implements MessageAddressBuilder {
540557

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,7 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
8686
this.publishCall =
8787
msg -> {
8888
try {
89-
org.apache.qpid.protonj2.client.Message<?> nativeMessage =
90-
((AmqpMessage) msg).nativeMessage();
91-
return this.sender.send(nativeMessage.durable(true));
89+
return this.doSend((AmqpMessage) msg);
9290
} catch (ClientIllegalStateException e) {
9391
// the link is closed
9492
LOGGER.debug("Error while publishing: '{}'. Closing publisher.", e.getMessage());
@@ -154,6 +152,11 @@ private Status mapDeliveryState(DeliveryState in) {
154152
}
155153
}
156154

155+
private Tracker doSend(AmqpMessage msg) throws ClientException {
156+
msg.enforceDurability();
157+
return this.sender.send(msg.nativeMessage());
158+
}
159+
157160
private static MetricsCollector.PublishDisposition mapToPublishDisposition(Status status) {
158161
if (status == Status.ACCEPTED) {
159162
return MetricsCollector.PublishDisposition.ACCEPTED;

src/test/java/com/rabbitmq/client/amqp/impl/AmqpMessageTest.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,32 @@ public class AmqpMessageTest {
2525

2626
@Test
2727
void toShouldBePathEncoded() {
28-
assertThat(new AmqpMessage().toAddress().exchange("foo bar").message().to())
28+
assertThat(msg().toAddress().exchange("foo bar").message().to())
2929
.isEqualTo("/exchanges/foo%20bar");
3030
}
3131

3232
@Test
3333
void replyToShouldBePathEncoded() {
34-
assertThat(new AmqpMessage().replyToAddress().exchange("foo bar").message().replyTo())
34+
assertThat(msg().replyToAddress().exchange("foo bar").message().replyTo())
3535
.isEqualTo("/exchanges/foo%20bar");
3636
}
37+
38+
@Test
39+
void shouldBeNonDurableOnlyIfExplicitlySet() throws Exception {
40+
AmqpMessage msg = msg();
41+
// durable by default
42+
assertThat(msg.enforceDurability().nativeMessage().durable()).isTrue();
43+
// non-durable explicitly set
44+
msg = msg();
45+
msg.durable(false);
46+
assertThat(msg.enforceDurability().nativeMessage().durable()).isFalse();
47+
// durable explicitly set
48+
msg = msg();
49+
msg.durable(true);
50+
assertThat(msg.enforceDurability().nativeMessage().durable()).isTrue();
51+
}
52+
53+
private static AmqpMessage msg() {
54+
return new AmqpMessage();
55+
}
3756
}

src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import static com.rabbitmq.client.amqp.Management.QueueType.STREAM;
2424
import static com.rabbitmq.client.amqp.impl.Assertions.assertThat;
2525
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_0_3;
26+
import static com.rabbitmq.client.amqp.impl.TestConditions.BrokerVersion.RABBITMQ_4_2_0;
2627
import static com.rabbitmq.client.amqp.impl.TestUtils.*;
2728
import static com.rabbitmq.client.amqp.impl.Utils.threadFactory;
2829
import static java.nio.charset.StandardCharsets.*;
@@ -36,6 +37,7 @@
3637

3738
import com.rabbitmq.client.amqp.AmqpException;
3839
import com.rabbitmq.client.amqp.Connection;
40+
import com.rabbitmq.client.amqp.ConsumerBuilder;
3941
import com.rabbitmq.client.amqp.Environment;
4042
import com.rabbitmq.client.amqp.Management;
4143
import com.rabbitmq.client.amqp.Message;
@@ -55,6 +57,7 @@
5557
import org.junit.jupiter.api.*;
5658
import org.junit.jupiter.params.ParameterizedTest;
5759
import org.junit.jupiter.params.provider.CsvSource;
60+
import org.junit.jupiter.params.provider.EnumSource;
5861
import org.junit.jupiter.params.provider.ValueSource;
5962

6063
@AmqpTestInfrastructure
@@ -906,4 +909,76 @@ void messageAnnotationsSupportListMapArray() {
906909
ints = (int[]) m.annotation("x-arrayInt");
907910
org.assertj.core.api.Assertions.assertThat(ints).containsExactly(4, 5, 6);
908911
}
912+
913+
@ParameterizedTest
914+
@CsvSource({
915+
"CLASSIC,true",
916+
"CLASSIC,false",
917+
"QUORUM,true",
918+
"QUORUM,false",
919+
"STREAM,true",
920+
"STREAM,false"
921+
})
922+
@BrokerVersionAtLeast(RABBITMQ_4_2_0)
923+
void explicitDurabilityShouldBeEnforced(Management.QueueType type, boolean durable) {
924+
try {
925+
connection.management().queue(this.name).type(type).declare();
926+
Publisher p = connection.publisherBuilder().queue(this.name).build();
927+
p.publish(p.message().durable(durable), ctx -> {});
928+
929+
Sync consumeSync = sync();
930+
AtomicReference<Message> messageRef = new AtomicReference<>();
931+
ConsumerBuilder builder =
932+
connection
933+
.consumerBuilder()
934+
.queue(this.name)
935+
.messageHandler(
936+
(context, message) -> {
937+
messageRef.set(message);
938+
context.accept();
939+
consumeSync.down();
940+
});
941+
if (type == STREAM) {
942+
builder.stream().offset(ConsumerBuilder.StreamOffsetSpecification.FIRST);
943+
}
944+
builder.build();
945+
assertThat(consumeSync).completes();
946+
Message message = messageRef.get();
947+
assertThat(message).isDurable(durable);
948+
} finally {
949+
connection.management().queueDelete(this.name);
950+
}
951+
}
952+
953+
@ParameterizedTest
954+
@EnumSource(Management.QueueType.class)
955+
void durableByDefault(Management.QueueType type) {
956+
try {
957+
connection.management().queue(this.name).type(type).declare();
958+
Publisher p = connection.publisherBuilder().queue(this.name).build();
959+
p.publish(p.message(), ctx -> {});
960+
961+
Sync consumeSync = sync();
962+
AtomicReference<Message> messageRef = new AtomicReference<>();
963+
ConsumerBuilder builder =
964+
connection
965+
.consumerBuilder()
966+
.queue(this.name)
967+
.messageHandler(
968+
(context, message) -> {
969+
messageRef.set(message);
970+
context.accept();
971+
consumeSync.down();
972+
});
973+
if (type == STREAM) {
974+
builder.stream().offset(ConsumerBuilder.StreamOffsetSpecification.FIRST);
975+
}
976+
builder.build();
977+
assertThat(consumeSync).completes();
978+
Message message = messageRef.get();
979+
assertThat(message).isDurable(true);
980+
} finally {
981+
connection.management().queueDelete(this.name);
982+
}
983+
}
909984
}

src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,13 @@ private MessageAssert hasField(String fieldLabel, Object value, Object expected)
359359
.isEqualTo(expected);
360360
return this;
361361
}
362+
363+
void isDurable(boolean durable) {
364+
isNotNull();
365+
if (actual.durable() != durable) {
366+
fail("Message durable flag should be %s but is %s", durable, actual.durable());
367+
}
368+
}
362369
}
363370

364371
static class ConnectionAssert extends AbstractObjectAssert<ConnectionAssert, AmqpConnection> {

src/test/java/com/rabbitmq/client/amqp/impl/TestConditions.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ private TestConditions() {}
3535

3636
public enum BrokerVersion {
3737
RABBITMQ_4_0_3("4.0.3"),
38-
RABBITMQ_4_1_0("4.1.0");
38+
RABBITMQ_4_1_0("4.1.0"),
39+
RABBITMQ_4_2_0("4.2.0");
3940

4041
final String value;
4142

0 commit comments

Comments
 (0)