Skip to content

Commit 07ec51d

Browse files
committed
set message durable as default
releaded to rabbitmq/rabbitmq-server#13918 Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 29ead26 commit 07ec51d

File tree

4 files changed

+86
-2
lines changed

4 files changed

+86
-2
lines changed

pkg/rabbitmqamqp/amqp_publisher.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ RabbitMQ supports the following DeliveryState types:
6262
- StateRejected
6363
See: https://www.rabbitmq.com/docs/next/amqp#outcomes for more information.
6464
65-
Note: If the destination address is not defined during the creation, the message must have a TO property set.
65+
If the destination address is not defined during the creation, the message must have a TO property set.
6666
You can use the helper "MessagePropertyToAddress" to create the destination address.
6767
See the examples:
6868
Create a new publisher that sends messages to a specific destination address:
@@ -84,6 +84,16 @@ Create a new publisher that sends messages based on message destination address:
8484
..:= MessagePropertyToAddress(msg, &QueueAddress{Queue: "myQueueName"})
8585
..:= publisher.Publish(context.Background(), msg)
8686
87+
</code>
88+
89+
The message is persistent by default by setting the Header.Durable to true when Header is nil.
90+
You can set the message to be non-persistent by setting the Header.Durable to false.
91+
Note:
92+
When you use the `Header` is up to you to set the message properties,
93+
You need set the `Header.Durable` to true or false.
94+
95+
<code>
96+
8797
</code>
8898
*/
8999
func (m *Publisher) Publish(ctx context.Context, message *amqp.Message) (*PublishResult, error) {

pkg/rabbitmqamqp/amqp_publisher_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,4 +203,62 @@ var _ = Describe("AMQP publisher ", func() {
203203

204204
Expect(connection.Close(context.Background())).To(BeNil())
205205
})
206+
207+
It("Message should durable by default", func() {
208+
// https://github.com/rabbitmq/rabbitmq-server/pull/13918
209+
210+
// Here we test the default behavior of the message durability
211+
// The lib should set the Header.Durable to true by default
212+
// when the Header is set by the user
213+
// it is up to the user to set the Header.Durable to true or false
214+
connection, err := Dial(context.Background(), "amqp://", nil)
215+
Expect(err).To(BeNil())
216+
Expect(connection).NotTo(BeNil())
217+
name := generateNameWithDateTime("Message should durable by default")
218+
_, err = connection.Management().DeclareQueue(context.Background(), &QuorumQueueSpecification{
219+
Name: name,
220+
})
221+
Expect(err).To(BeNil())
222+
223+
publisher, err := connection.NewPublisher(context.Background(), &QueueAddress{Queue: name}, nil)
224+
Expect(err).To(BeNil())
225+
Expect(publisher).NotTo(BeNil())
226+
227+
msg := NewMessage([]byte("hello"))
228+
Expect(msg.Header).To(BeNil())
229+
publishResult, err := publisher.Publish(context.Background(), msg)
230+
Expect(err).To(BeNil())
231+
Expect(publishResult).NotTo(BeNil())
232+
Expect(publishResult.Outcome).To(Equal(&StateAccepted{}))
233+
Expect(msg.Header).NotTo(BeNil())
234+
Expect(msg.Header.Durable).To(BeTrue())
235+
236+
consumer, err := connection.NewConsumer(context.Background(), name, nil)
237+
Expect(err).To(BeNil())
238+
Expect(consumer).NotTo(BeNil())
239+
dc, err := consumer.Receive(context.Background())
240+
Expect(err).To(BeNil())
241+
Expect(dc).NotTo(BeNil())
242+
Expect(dc.Message().Header).NotTo(BeNil())
243+
Expect(dc.Message().Header.Durable).To(BeTrue())
244+
Expect(dc.Accept(context.Background())).To(BeNil())
245+
246+
msgNotPersistent := NewMessageWithPersistence([]byte("hello"), false)
247+
publishResult, err = publisher.Publish(context.Background(), msgNotPersistent)
248+
Expect(err).To(BeNil())
249+
Expect(publishResult).NotTo(BeNil())
250+
Expect(publishResult.Outcome).To(Equal(&StateAccepted{}))
251+
Expect(msgNotPersistent.Header).NotTo(BeNil())
252+
Expect(msgNotPersistent.Header.Durable).To(BeFalse())
253+
dc, err = consumer.Receive(context.Background())
254+
Expect(err).To(BeNil())
255+
Expect(dc).NotTo(BeNil())
256+
Expect(dc.Message().Header).NotTo(BeNil())
257+
Expect(dc.Message().Header.Durable).To(BeFalse())
258+
Expect(dc.Accept(context.Background())).To(BeNil())
259+
Expect(publisher.Close(context.Background())).To(BeNil())
260+
Expect(connection.Management().DeleteQueue(context.Background(), name)).To(BeNil())
261+
Expect(connection.Close(context.Background())).To(BeNil())
262+
263+
})
206264
})

pkg/rabbitmqamqp/amqp_queue.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,18 @@ func (a *AmqpQueueInfo) Members() []string {
2929
}
3030

3131
func newAmqpQueueInfo(response map[string]any) *AmqpQueueInfo {
32+
leader := ""
33+
if response["leader"] != nil {
34+
leader = response["leader"].(string)
35+
}
36+
3237
return &AmqpQueueInfo{
3338
name: response["name"].(string),
3439
isDurable: response["durable"].(bool),
3540
isAutoDelete: response["auto_delete"].(bool),
3641
isExclusive: response["exclusive"].(bool),
3742
queueType: TQueueType(response["type"].(string)),
38-
leader: response["leader"].(string),
43+
leader: leader,
3944
members: response["replicas"].([]string),
4045
arguments: response["arguments"].(map[string]any),
4146
consumerCount: response["consumer_count"].(uint32),

pkg/rabbitmqamqp/messages_helper.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,17 @@ func NewMessage(body []byte) *amqp.Message {
3030
return amqp.NewMessage(body)
3131
}
3232

33+
// NewMessageWithHeader creates a new AMQP 1.0 message with the given payload and sets the persistence to the given value.
34+
// The persistence is set by setting the Header.Durable property to true or false.
35+
36+
func NewMessageWithPersistence(body []byte, persistence bool) *amqp.Message {
37+
m := amqp.NewMessage(body)
38+
m.Header = &amqp.MessageHeader{
39+
Durable: persistence,
40+
}
41+
return m
42+
}
43+
3344
// NewMessageWithAddress creates a new AMQP 1.0 new message with the given payload and sets the To property to the address of the target.
3445
// The target must be a QueueAddress or an ExchangeAddress.
3546
// This function is a helper that combines NewMessage and MessagePropertyToAddress.

0 commit comments

Comments
 (0)