Skip to content

Commit e1a930b

Browse files
committed
Make message durability configurable
Outbound messages are marked as durable by default. They are marked non-durable only if explicitly set.
1 parent 7f657a4 commit e1a930b

File tree

6 files changed

+137
-5
lines changed

6 files changed

+137
-5
lines changed

src/main/java/com/rabbitmq/client/amqp/Message.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -591,6 +591,19 @@ public interface Message {
591591
*/
592592
byte[] body();
593593

594+
/**
595+
* Mark the message as durable or not.
596+
*
597+
* <p>Messages are durable by default, use <code>false</code> to make them explicitly non-durable.
598+
*
599+
* <p>Durability depends also on the queue messages end up in (e.g. quorum queues and streams
600+
* always store messages durably).
601+
*
602+
* @param durable true for a durable message, false for a non-durable message
603+
* @return the message
604+
*/
605+
Message durable(boolean durable);
606+
594607
/**
595608
* Whether the message is durable.
596609
*

src/main/java/com/rabbitmq/client/amqp/impl/AmqpMessage.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ final class AmqpMessage implements Message {
3535

3636
private final org.apache.qpid.protonj2.client.Message<byte[]> delegate;
3737

38+
private boolean durableIsSet = false;
39+
3840
AmqpMessage() {
3941
this(org.apache.qpid.protonj2.client.Message.create(EMPTY_BODY));
4042
}
@@ -455,6 +457,14 @@ public byte[] body() {
455457
}
456458

457459
// header section
460+
461+
@Override
462+
public Message durable(boolean durable) {
463+
this.durableIsSet = true;
464+
callOnDelegate(m -> m.durable(durable));
465+
return this;
466+
}
467+
458468
@Override
459469
public boolean durable() {
460470
return returnFromDelegate(org.apache.qpid.protonj2.client.Message::durable);
@@ -535,6 +545,13 @@ public MessageAddressBuilder replyToAddress() {
535545
return new DefaultMessageAddressBuilder(this, DefaultMessageAddressBuilder.REPLY_TO_CALLBACK);
536546
}
537547

548+
AmqpMessage enforceDurability() throws ClientException {
549+
if (!this.durableIsSet) {
550+
this.delegate.durable(true);
551+
}
552+
return this;
553+
}
554+
538555
private static class DefaultMessageAddressBuilder
539556
extends DefaultAddressBuilder<MessageAddressBuilder> implements MessageAddressBuilder {
540557

src/main/java/com/rabbitmq/client/amqp/impl/AmqpPublisher.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,7 @@ final class AmqpPublisher extends ResourceBase implements Publisher {
8686
this.publishCall =
8787
msg -> {
8888
try {
89-
org.apache.qpid.protonj2.client.Message<?> nativeMessage =
90-
((AmqpMessage) msg).nativeMessage();
91-
return this.sender.send(nativeMessage.durable(true));
89+
return this.doSend((AmqpMessage) msg);
9290
} catch (ClientIllegalStateException e) {
9391
// the link is closed
9492
LOGGER.debug("Error while publishing: '{}'. Closing publisher.", e.getMessage());
@@ -154,6 +152,11 @@ private Status mapDeliveryState(DeliveryState in) {
154152
}
155153
}
156154

155+
private Tracker doSend(AmqpMessage msg) throws ClientException {
156+
msg.enforceDurability();
157+
return this.sender.send(msg.nativeMessage());
158+
}
159+
157160
private static MetricsCollector.PublishDisposition mapToPublishDisposition(Status status) {
158161
if (status == Status.ACCEPTED) {
159162
return MetricsCollector.PublishDisposition.ACCEPTED;

src/test/java/com/rabbitmq/client/amqp/impl/AmqpMessageTest.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,32 @@ public class AmqpMessageTest {
2525

2626
@Test
2727
void toShouldBePathEncoded() {
28-
assertThat(new AmqpMessage().toAddress().exchange("foo bar").message().to())
28+
assertThat(msg().toAddress().exchange("foo bar").message().to())
2929
.isEqualTo("/exchanges/foo%20bar");
3030
}
3131

3232
@Test
3333
void replyToShouldBePathEncoded() {
34-
assertThat(new AmqpMessage().replyToAddress().exchange("foo bar").message().replyTo())
34+
assertThat(msg().replyToAddress().exchange("foo bar").message().replyTo())
3535
.isEqualTo("/exchanges/foo%20bar");
3636
}
37+
38+
@Test
39+
void shouldBeNonDurableOnlyIfExplicitlySet() throws Exception {
40+
AmqpMessage msg = msg();
41+
// durable by default
42+
assertThat(msg.enforceDurability().nativeMessage().durable()).isTrue();
43+
// non-durable explicitly set
44+
msg = msg();
45+
msg.durable(false);
46+
assertThat(msg.enforceDurability().nativeMessage().durable()).isFalse();
47+
// durable explicitly set
48+
msg = msg();
49+
msg.durable(true);
50+
assertThat(msg.enforceDurability().nativeMessage().durable()).isTrue();
51+
}
52+
53+
private static AmqpMessage msg() {
54+
return new AmqpMessage();
55+
}
3756
}

src/test/java/com/rabbitmq/client/amqp/impl/AmqpTest.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import com.rabbitmq.client.amqp.AmqpException;
3838
import com.rabbitmq.client.amqp.Connection;
39+
import com.rabbitmq.client.amqp.ConsumerBuilder;
3940
import com.rabbitmq.client.amqp.Environment;
4041
import com.rabbitmq.client.amqp.Management;
4142
import com.rabbitmq.client.amqp.Message;
@@ -55,6 +56,7 @@
5556
import org.junit.jupiter.api.*;
5657
import org.junit.jupiter.params.ParameterizedTest;
5758
import org.junit.jupiter.params.provider.CsvSource;
59+
import org.junit.jupiter.params.provider.EnumSource;
5860
import org.junit.jupiter.params.provider.ValueSource;
5961

6062
@AmqpTestInfrastructure
@@ -906,4 +908,75 @@ void messageAnnotationsSupportListMapArray() {
906908
ints = (int[]) m.annotation("x-arrayInt");
907909
org.assertj.core.api.Assertions.assertThat(ints).containsExactly(4, 5, 6);
908910
}
911+
912+
@ParameterizedTest
913+
@CsvSource({
914+
"CLASSIC,true",
915+
"CLASSIC,false",
916+
"QUORUM,true",
917+
"QUORUM,false",
918+
"STREAM,true",
919+
"STREAM,false"
920+
})
921+
void explicitDurabilityShouldBeEnforced(Management.QueueType type, boolean durable) {
922+
try {
923+
connection.management().queue(this.name).type(type).declare();
924+
Publisher p = connection.publisherBuilder().queue(this.name).build();
925+
p.publish(p.message().durable(durable), ctx -> {});
926+
927+
Sync consumeSync = sync();
928+
AtomicReference<Message> messageRef = new AtomicReference<>();
929+
ConsumerBuilder builder =
930+
connection
931+
.consumerBuilder()
932+
.queue(this.name)
933+
.messageHandler(
934+
(context, message) -> {
935+
messageRef.set(message);
936+
context.accept();
937+
consumeSync.down();
938+
});
939+
if (type == STREAM) {
940+
builder.stream().offset(ConsumerBuilder.StreamOffsetSpecification.FIRST);
941+
}
942+
builder.build();
943+
assertThat(consumeSync).completes();
944+
Message message = messageRef.get();
945+
assertThat(message).isDurable(durable);
946+
} finally {
947+
connection.management().queueDelete(this.name);
948+
}
949+
}
950+
951+
@ParameterizedTest
952+
@EnumSource(Management.QueueType.class)
953+
void durableByDefault(Management.QueueType type) {
954+
try {
955+
connection.management().queue(this.name).type(type).declare();
956+
Publisher p = connection.publisherBuilder().queue(this.name).build();
957+
p.publish(p.message(), ctx -> {});
958+
959+
Sync consumeSync = sync();
960+
AtomicReference<Message> messageRef = new AtomicReference<>();
961+
ConsumerBuilder builder =
962+
connection
963+
.consumerBuilder()
964+
.queue(this.name)
965+
.messageHandler(
966+
(context, message) -> {
967+
messageRef.set(message);
968+
context.accept();
969+
consumeSync.down();
970+
});
971+
if (type == STREAM) {
972+
builder.stream().offset(ConsumerBuilder.StreamOffsetSpecification.FIRST);
973+
}
974+
builder.build();
975+
assertThat(consumeSync).completes();
976+
Message message = messageRef.get();
977+
assertThat(message).isDurable(true);
978+
} finally {
979+
connection.management().queueDelete(this.name);
980+
}
981+
}
909982
}

src/test/java/com/rabbitmq/client/amqp/impl/Assertions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,13 @@ private MessageAssert hasField(String fieldLabel, Object value, Object expected)
359359
.isEqualTo(expected);
360360
return this;
361361
}
362+
363+
void isDurable(boolean durable) {
364+
isNotNull();
365+
if (actual.durable() != durable) {
366+
fail("Message durable flag should be %s but is %s", durable, actual.durable());
367+
}
368+
}
362369
}
363370

364371
static class ConnectionAssert extends AbstractObjectAssert<ConnectionAssert, AmqpConnection> {

0 commit comments

Comments
 (0)