Skip to content

Commit 11323fe

Browse files
committed
set message durable as default
releaded to rabbitmq/rabbitmq-server#13918 Signed-off-by: Gabriele Santomaggio <[email protected]>
1 parent 71bc439 commit 11323fe

File tree

2 files changed

+65
-32
lines changed

2 files changed

+65
-32
lines changed

rabbitmq_amqp_python_client/qpid/proton/_message.py

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class Message(object):
110110
""" Default AMQP message priority"""
111111

112112
def __init__(
113-
self, body: Union[bytes, None] = None, inferred=True, **kwargs
113+
self, body: Union[bytes, None] = None, inferred=True, durable=True, **kwargs
114114
) -> None:
115115
# validate the types
116116

@@ -120,6 +120,7 @@ def __init__(
120120
self.properties = None
121121
self.body = body
122122
self.inferred = inferred
123+
self.durable = durable
123124

124125
for k, v in kwargs.items():
125126
getattr(self, k) # Raise exception if it's not a valid attribute.
@@ -504,7 +505,7 @@ def instructions(self) -> Optional[AnnotationDict]:
504505

505506
@instructions.setter
506507
def instructions(
507-
self, instructions: Optional[Dict[Union[str, int], "PythonAMQPData"]]
508+
self, instructions: Optional[Dict[Union[str, int], "PythonAMQPData"]]
508509
) -> None:
509510
if isinstance(instructions, dict):
510511
self.instruction_dict = AnnotationDict(instructions, raise_on_error=False)
@@ -527,7 +528,7 @@ def annotations(self) -> Optional[AnnotationDict]:
527528

528529
@annotations.setter
529530
def annotations(
530-
self, annotations: Optional[Dict[Union[str, int], "PythonAMQPData"]]
531+
self, annotations: Optional[Dict[Union[str, int], "PythonAMQPData"]]
531532
) -> None:
532533
if isinstance(annotations, dict):
533534
self.annotation_dict = AnnotationDict(annotations, raise_on_error=False)
@@ -594,7 +595,8 @@ def send(self, sender: "Sender", tag: Optional[str] = None) -> "Delivery":
594595
return dlv
595596

596597
@overload
597-
def recv(self, link: "Sender") -> None: ...
598+
def recv(self, link: "Sender") -> None:
599+
...
598600

599601
def recv(self, link: "Receiver") -> Optional["Delivery"]:
600602
"""
@@ -625,24 +627,24 @@ def recv(self, link: "Receiver") -> Optional["Delivery"]:
625627
def __repr__(self) -> str:
626628
props = []
627629
for attr in (
628-
"inferred",
629-
"address",
630-
"reply_to",
631-
"durable",
632-
"ttl",
633-
"priority",
634-
"first_acquirer",
635-
"delivery_count",
636-
"id",
637-
"correlation_id",
638-
"user_id",
639-
"group_id",
640-
"group_sequence",
641-
"reply_to_group_id",
642-
"instructions",
643-
"annotations",
644-
"properties",
645-
"body",
630+
"inferred",
631+
"address",
632+
"reply_to",
633+
"durable",
634+
"ttl",
635+
"priority",
636+
"first_acquirer",
637+
"delivery_count",
638+
"id",
639+
"correlation_id",
640+
"user_id",
641+
"group_id",
642+
"group_sequence",
643+
"reply_to_group_id",
644+
"instructions",
645+
"annotations",
646+
"properties",
647+
"body",
646648
):
647649
value = getattr(self, attr)
648650
if value:

tests/test_publisher.py

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ def test_validate_message_for_publishing(connection: Connection) -> None:
4545

4646

4747
def test_publish_queue(connection: Connection) -> None:
48-
4948
queue_name = "test-queue"
5049
management = connection.management()
5150

@@ -77,7 +76,6 @@ def test_publish_queue(connection: Connection) -> None:
7776

7877

7978
def test_publish_per_message(connection: Connection) -> None:
80-
8179
queue_name = "test-queue-1"
8280
queue_name_2 = "test-queue-2"
8381
management = connection.management()
@@ -123,7 +121,6 @@ def test_publish_per_message(connection: Connection) -> None:
123121

124122

125123
def test_publish_ssl(connection_ssl: Connection) -> None:
126-
127124
queue_name = "test-queue"
128125
management = connection_ssl.management()
129126

@@ -148,7 +145,6 @@ def test_publish_ssl(connection_ssl: Connection) -> None:
148145

149146

150147
def test_publish_to_invalid_destination(connection: Connection) -> None:
151-
152148
queue_name = "test-queue"
153149

154150
raised = False
@@ -169,7 +165,6 @@ def test_publish_to_invalid_destination(connection: Connection) -> None:
169165

170166

171167
def test_publish_per_message_to_invalid_destination(connection: Connection) -> None:
172-
173168
queue_name = "test-queue-1"
174169
raised = False
175170

@@ -193,7 +188,6 @@ def test_publish_per_message_to_invalid_destination(connection: Connection) -> N
193188

194189

195190
def test_publish_per_message_both_address(connection: Connection) -> None:
196-
197191
queue_name = "test-queue-1"
198192
raised = False
199193

@@ -223,7 +217,6 @@ def test_publish_per_message_both_address(connection: Connection) -> None:
223217

224218

225219
def test_publish_exchange(connection: Connection) -> None:
226-
227220
exchange_name = "test-exchange"
228221
queue_name = "test-queue"
229222
management = connection.management()
@@ -342,7 +335,6 @@ def test_disconnection_reconnection() -> None:
342335

343336

344337
def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
345-
346338
stream_name = "test_stream_info_with_validation"
347339
messages_to_send = 200
348340

@@ -361,7 +353,6 @@ def test_queue_info_for_stream_with_validations(connection: Connection) -> None:
361353

362354

363355
def test_publish_per_message_exchange(connection: Connection) -> None:
364-
365356
exchange_name = "test-exchange-per-message"
366357
queue_name = "test-queue-per-message"
367358
management = connection.management()
@@ -407,7 +398,6 @@ def test_publish_per_message_exchange(connection: Connection) -> None:
407398

408399

409400
def test_multiple_publishers(environment: Environment) -> None:
410-
411401
stream_name = "test_multiple_publisher_1"
412402
stream_name_2 = "test_multiple_publisher_2"
413403
connection = environment.connection()
@@ -456,3 +446,44 @@ def test_multiple_publishers(environment: Environment) -> None:
456446
management.delete_queue(stream_name_2)
457447

458448
management.close()
449+
450+
451+
def test_durable_message(connection: Connection) -> None:
452+
queue_name = "test_durable_message"
453+
454+
management = connection.management()
455+
management.declare_queue(QuorumQueueSpecification(name=queue_name))
456+
destination = AddressHelper.queue_address(queue_name)
457+
publisher = connection.publisher(destination)
458+
# message should be durable by default
459+
status = publisher.publish(
460+
Message(
461+
body=Converter.string_to_bytes("test"),
462+
)
463+
)
464+
465+
assert status.remote_state == OutcomeState.ACCEPTED
466+
# message should be not durable by setting the durable to False by the user
467+
status = publisher.publish(
468+
Message(
469+
body=Converter.string_to_bytes("test"),
470+
durable=False,
471+
)
472+
)
473+
474+
assert status.remote_state == OutcomeState.ACCEPTED
475+
476+
consumer = connection.consumer(destination)
477+
should_be_durable = consumer.consume()
478+
assert should_be_durable.durable is True
479+
480+
should_be_not_durable = consumer.consume()
481+
assert should_be_not_durable.durable is False
482+
483+
consumer.close()
484+
485+
management.delete_queue(queue_name)
486+
487+
management.close()
488+
489+
pass

0 commit comments

Comments
 (0)