Skip to content

Commit c3fcc48

Browse files
authored
[SB] Batch size update (#38313)
* test * black * Update sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py * Update sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py * refactor * remove double init * pylint
1 parent c26f6a9 commit c3fcc48

File tree

5 files changed

+64
-9
lines changed

5 files changed

+64
-9
lines changed

sdk/servicebus/azure-servicebus/azure/servicebus/_common/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,8 @@
135135
DEAD_LETTER_SUPPLEMENTARY_AUTHORIZATION_HEADER = "ServiceBusDlqSupplementaryAuthorization"
136136

137137
MAX_MESSAGE_LENGTH_BYTES = 1024 * 1024 # Backcompat with uAMQP
138+
MAX_BATCH_SIZE_STANDARD = 256 * 1024
139+
MAX_BATCH_SIZE_PREMIUM = 1024 * 1024
138140
MESSAGE_PROPERTY_MAX_LENGTH = 128
139141
# .NET TimeSpan.MaxValue: 10675199.02:48:05.4775807
140142
MAX_DURATION_VALUE = 922337203685477

sdk/servicebus/azure-servicebus/azure/servicebus/_servicebus_sender.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@
3636
MGMT_REQUEST_MESSAGE_ID,
3737
MGMT_REQUEST_PARTITION_KEY,
3838
MAX_MESSAGE_LENGTH_BYTES,
39+
MAX_BATCH_SIZE_STANDARD,
40+
MAX_BATCH_SIZE_PREMIUM,
3941
)
4042

4143
if TYPE_CHECKING:
@@ -81,7 +83,6 @@ def _create_attribute(self, **kwargs):
8183
# TODO: What's the retry overlap between servicebus and pyamqp?
8284
self._error_policy = self._amqp_transport.create_retry_policy(self._config)
8385
self._name = kwargs.get("client_identifier") or f"SBSender-{uuid.uuid4()}"
84-
self._max_message_size_on_link = 0
8586
self.entity_name: str = self._entity_name
8687

8788
@classmethod
@@ -187,8 +188,8 @@ def __init__(
187188
topic_name=topic_name,
188189
**kwargs,
189190
)
190-
191191
self._max_message_size_on_link = 0
192+
self._max_batch_size_on_link = 0
192193
self._create_attribute(**kwargs)
193194
self._connection = kwargs.get("connection")
194195
self._handler: Union["pyamqp_SendClientSync", "uamqp_SendClientSync"]
@@ -267,6 +268,10 @@ def _open(self):
267268
self._max_message_size_on_link = (
268269
self._amqp_transport.get_remote_max_message_size(self._handler) or MAX_MESSAGE_LENGTH_BYTES
269270
)
271+
if self._max_message_size_on_link > MAX_BATCH_SIZE_PREMIUM:
272+
self._max_batch_size_on_link = MAX_BATCH_SIZE_PREMIUM
273+
else:
274+
self._max_batch_size_on_link = MAX_BATCH_SIZE_STANDARD
270275
except:
271276
self._close_handler()
272277
raise
@@ -501,14 +506,14 @@ def create_message_batch(self, max_size_in_bytes: Optional[int] = None) -> Servi
501506
if not self._max_message_size_on_link:
502507
self._open_with_retry()
503508

504-
if max_size_in_bytes and max_size_in_bytes > self._max_message_size_on_link:
509+
if max_size_in_bytes and max_size_in_bytes > self._max_batch_size_on_link:
505510
raise ValueError(
506511
f"Max message size: {max_size_in_bytes} is too large, "
507-
f"acceptable max batch size is: {self._max_message_size_on_link} bytes."
512+
f"acceptable max batch size is: {self._max_batch_size_on_link} bytes."
508513
)
509514

510515
return ServiceBusMessageBatch(
511-
max_size_in_bytes=(max_size_in_bytes or self._max_message_size_on_link),
516+
max_size_in_bytes=(max_size_in_bytes or self._max_batch_size_on_link),
512517
amqp_transport=self._amqp_transport,
513518
tracing_attributes={
514519
TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace,

sdk/servicebus/azure-servicebus/azure/servicebus/aio/_servicebus_sender_async.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
REQUEST_RESPONSE_CANCEL_SCHEDULED_MESSAGE_OPERATION,
2323
MGMT_REQUEST_SEQUENCE_NUMBERS,
2424
MAX_MESSAGE_LENGTH_BYTES,
25+
MAX_BATCH_SIZE_PREMIUM,
26+
MAX_BATCH_SIZE_STANDARD,
2527
)
2628
from .._common import mgmt_handlers
2729
from .._common.utils import transform_outbound_messages
@@ -140,8 +142,8 @@ def __init__(
140142
topic_name=topic_name,
141143
**kwargs,
142144
)
143-
144145
self._max_message_size_on_link = 0
146+
self._max_batch_size_on_link = 0
145147
self._create_attribute(**kwargs)
146148
self._connection = kwargs.get("connection")
147149
self._handler: Union["pyamqp_SendClientAsync", "uamqp_SendClientAsync"]
@@ -215,6 +217,10 @@ async def _open(self):
215217
self._max_message_size_on_link = (
216218
self._amqp_transport.get_remote_max_message_size(self._handler) or MAX_MESSAGE_LENGTH_BYTES
217219
)
220+
if self._max_message_size_on_link > MAX_BATCH_SIZE_PREMIUM:
221+
self._max_batch_size_on_link = MAX_BATCH_SIZE_PREMIUM
222+
else:
223+
self._max_batch_size_on_link = MAX_BATCH_SIZE_STANDARD
218224
except:
219225
await self._close_handler()
220226
raise
@@ -458,14 +464,14 @@ async def create_message_batch(self, max_size_in_bytes: Optional[int] = None) ->
458464
if not self._max_message_size_on_link:
459465
await self._open_with_retry()
460466

461-
if max_size_in_bytes and max_size_in_bytes > self._max_message_size_on_link:
467+
if max_size_in_bytes and max_size_in_bytes > self._max_batch_size_on_link:
462468
raise ValueError(
463469
f"Max message size: {max_size_in_bytes} is too large, "
464-
f"acceptable max batch size is: {self._max_message_size_on_link} bytes."
470+
f"acceptable max batch size is: {self._max_batch_size_on_link} bytes."
465471
)
466472

467473
return ServiceBusMessageBatch(
468-
max_size_in_bytes=(max_size_in_bytes or self._max_message_size_on_link),
474+
max_size_in_bytes=(max_size_in_bytes or self._max_batch_size_on_link),
469475
amqp_transport=self._amqp_transport,
470476
tracing_attributes={
471477
TraceAttributes.TRACE_NET_PEER_NAME_ATTRIBUTE: self.fully_qualified_namespace,

sdk/servicebus/azure-servicebus/tests/async_tests/test_queues_async.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1996,6 +1996,27 @@ def message_content():
19961996
await receiver.complete_message(message)
19971997
count += 1
19981998

1999+
@pytest.mark.liveTest
2000+
@pytest.mark.live_test_only
2001+
@CachedServiceBusResourceGroupPreparer(name_prefix="servicebustest")
2002+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
2003+
@ServiceBusQueuePreparer(name_prefix="servicebustest", dead_lettering_on_message_expiration=True)
2004+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
2005+
@ArgPasserAsync()
2006+
async def test_async_queue_message_batch_large(
2007+
self, uamqp_transport, *, servicebus_namespace=None, servicebus_queue=None, **kwargs
2008+
):
2009+
2010+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
2011+
credential = get_credential(is_async=True)
2012+
async with ServiceBusClient(
2013+
fully_qualified_namespace, credential, logging_enable=False, uamqp_transport=uamqp_transport
2014+
) as sb_client:
2015+
async with sb_client.get_queue_sender(servicebus_queue.name) as sender:
2016+
with pytest.raises(ValueError):
2017+
batch_message = [ServiceBusMessage("test message " * 256 * 1024)]
2018+
await sender.send_messages(batch_message)
2019+
19992020
@pytest.mark.asyncio
20002021
@pytest.mark.liveTest
20012022
@pytest.mark.live_test_only

sdk/servicebus/azure-servicebus/tests/test_queues.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1922,6 +1922,27 @@ def message_content():
19221922
receiver.complete_message(message)
19231923
count += 1
19241924

1925+
@pytest.mark.liveTest
1926+
@pytest.mark.live_test_only
1927+
@CachedServiceBusResourceGroupPreparer(name_prefix="servicebustest")
1928+
@CachedServiceBusNamespacePreparer(name_prefix="servicebustest")
1929+
@ServiceBusQueuePreparer(name_prefix="servicebustest", dead_lettering_on_message_expiration=True)
1930+
@pytest.mark.parametrize("uamqp_transport", uamqp_transport_params, ids=uamqp_transport_ids)
1931+
@ArgPasser()
1932+
def test_queue_message_batch_large(
1933+
self, uamqp_transport, *, servicebus_namespace=None, servicebus_queue=None, **kwargs
1934+
):
1935+
1936+
fully_qualified_namespace = f"{servicebus_namespace.name}{SERVICEBUS_ENDPOINT_SUFFIX}"
1937+
credential = get_credential()
1938+
with ServiceBusClient(
1939+
fully_qualified_namespace, credential, logging_enable=False, uamqp_transport=uamqp_transport
1940+
) as sb_client:
1941+
with sb_client.get_queue_sender(servicebus_queue.name) as sender:
1942+
with pytest.raises(ValueError):
1943+
batch_message = [ServiceBusMessage("test message " * 256 * 1024)]
1944+
sender.send_messages(batch_message)
1945+
19251946
@pytest.mark.liveTest
19261947
@pytest.mark.live_test_only
19271948
@CachedServiceBusResourceGroupPreparer(name_prefix="servicebustest")

0 commit comments

Comments
 (0)