Skip to content

Commit a51d0a4

Browse files
authored
[ServiceBus] revert link credit calc to cumulative (#40300)
* [ServiceBus] revert link credit calc to cumulative * copy sb pyamqp to eh * fix link credit pyamqp test
1 parent 433a737 commit a51d0a4

File tree

13 files changed

+21
-88
lines changed

13 files changed

+21
-88
lines changed

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_client_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ async def _keep_alive_async(self):
147147
elapsed_time = current_time - start_time
148148
if elapsed_time >= self._keep_alive_interval:
149149
await asyncio.shield(
150-
self._connection.listen(wait=self._socket_timeout, batch=self._link.total_link_credit)
150+
self._connection.listen(wait=self._socket_timeout, batch=self._link.current_link_credit)
151151
)
152152
start_time = current_time
153153
await asyncio.sleep(1)
@@ -733,7 +733,7 @@ async def _client_run_async(self, **kwargs):
733733
:rtype: bool
734734
"""
735735
try:
736-
if self._link.total_link_credit <= 0:
736+
if self._link.current_link_credit<= 0:
737737
await self._link.flow(link_credit=self._link_credit)
738738
await self._connection.listen(wait=self._socket_timeout, **kwargs)
739739
except ValueError:

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_link_async.py

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ def __init__(
9696
self._on_link_state_change = kwargs.get("on_link_state_change")
9797
self._on_attach = kwargs.get("on_attach")
9898
self._error: Optional[AMQPLinkError] = None
99-
self.total_link_credit = self.link_credit
10099

101100
async def __aenter__(self) -> "Link":
102101
await self.attach()
@@ -276,19 +275,6 @@ async def detach(self, close: bool = False, error: Optional[AMQPError] = None) -
276275
await self._set_state(LinkState.DETACHED)
277276

278277
async def flow(self, *, link_credit: Optional[int] = None, **kwargs) -> None:
279-
# Given the desired link credit `link_credit`, the link credit sent via
280-
# FlowFrame is calculated as follows: The link credit to flow on the wire
281-
# `self.current_link_credit` is the desired link credit `link_credit`
282-
# minus the current link credit on the wire `self.total_link_credit`.
283-
self.current_link_credit = link_credit - self.total_link_credit if link_credit is not None else self.link_credit
284-
285-
# If the link credit to flow is greater than 0 (i.e the desired link credit
286-
# is greater than the current link credit on the wire), then we will send a
287-
# flow to issue more link credit. Otherwise link credit on the wire is sufficient.
288-
if self.current_link_credit > 0:
289-
# Calculate the total link credit on the wire, by adding the credit
290-
# we will flow to the total link credit.
291-
self.total_link_credit = (
292-
self.current_link_credit + self.total_link_credit if link_credit is not None else self.link_credit
293-
)
294-
await self._outgoing_flow(**kwargs)
278+
# Reset link credit to the default and flow
279+
self.current_link_credit = link_credit if link_credit is not None else self.link_credit
280+
await self._outgoing_flow(**kwargs)

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/aio/_receiver_async.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ async def _incoming_transfer(self, frame):
6464
if not frame[5]:
6565
self.delivery_count += 1
6666
self.current_link_credit -= 1
67-
self.total_link_credit -= 1
6867
if self.received_delivery_id is not None:
6968
self._first_frame = frame
7069
if not self.received_delivery_id and not self._received_payload:

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ def _keep_alive(self):
237237
current_time = time.time()
238238
elapsed_time = current_time - start_time
239239
if elapsed_time >= self._keep_alive_interval:
240-
self._connection.listen(wait=self._socket_timeout, batch=self._link.total_link_credit)
240+
self._connection.listen(wait=self._socket_timeout, batch=self._link.current_link_credit)
241241
start_time = current_time
242242
time.sleep(1)
243243
except Exception as e: # pylint: disable=broad-except
@@ -852,7 +852,7 @@ def _client_run(self, **kwargs):
852852
:rtype: bool
853853
"""
854854
try:
855-
if self._link.total_link_credit <= 0:
855+
if self._link.current_link_credit <= 0:
856856
self._link.flow(link_credit=self._link_credit)
857857
self._connection.listen(wait=self._socket_timeout, **kwargs)
858858
except ValueError:

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/link.py

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ def __init__(
9292
self._on_link_state_change = kwargs.get("on_link_state_change")
9393
self._on_attach = kwargs.get("on_attach")
9494
self._error: Optional[AMQPLinkError] = None
95-
self.total_link_credit = self.link_credit
9695

9796
def __enter__(self) -> "Link":
9897
self.attach()
@@ -273,18 +272,5 @@ def detach(self, close: bool = False, error: Optional[AMQPError] = None) -> None
273272
self._set_state(LinkState.DETACHED)
274273

275274
def flow(self, *, link_credit: Optional[int] = None, **kwargs: Any) -> None:
276-
# Given the desired link credit `link_credit`, the link credit sent via
277-
# FlowFrame is calculated as follows: The link credit to flow on the wire
278-
# `self.current_link_credit` is the desired link credit
279-
# `link_credit` minus the current link credit on the wire `self.total_link_credit`.
280-
self.current_link_credit = link_credit - self.total_link_credit if link_credit is not None else self.link_credit
281-
282-
# If the link credit to flow is greater than 0 (i.e the desired link credit is greater than
283-
# the current link credit on the wire), then we will send a flow to issue more link credit.
284-
# Otherwise link credit on the wire is sufficient.
285-
if self.current_link_credit > 0:
286-
# Calculate the total link credit on the wire, by adding the credit we will flow to the total link credit.
287-
self.total_link_credit = (
288-
self.current_link_credit + self.total_link_credit if link_credit is not None else self.link_credit
289-
)
290-
self._outgoing_flow(**kwargs)
275+
self.current_link_credit = link_credit if link_credit is not None else self.link_credit
276+
self._outgoing_flow(**kwargs)

sdk/eventhub/azure-eventhub/azure/eventhub/_pyamqp/receiver.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ def _incoming_transfer(self, frame):
6060
# If more is false --> this is the last frame of the message
6161
if not frame[5]:
6262
self.current_link_credit -= 1
63-
self.total_link_credit -= 1
6463
self.delivery_count += 1
6564
self.received_delivery_id = frame[1] # delivery_id
6665
if self.received_delivery_id is not None:

sdk/eventhub/azure-eventhub/tests/pyamqp_tests/unittest/test_link.py

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -131,8 +131,6 @@ def mock_outgoing():
131131
)
132132

133133
link._outgoing_flow = mock_outgoing
134-
link.total_link_credit = 0 # Set the total link credit to 0 to start, no credit on the wire
135-
136134
link.flow(link_credit=100) # Send a flow frame with desired link credit of 100
137135

138136
# frame: handle, delivery_id, delivery_tag, message_format, settled, more, rcv_settle_mode, state, resume, aborted, batchable, payload
@@ -142,20 +140,15 @@ def mock_outgoing():
142140

143141
link._incoming_transfer(transfer_frame_one)
144142
assert link.current_link_credit == 99
145-
assert link.total_link_credit == 99
146143

147144
# Only received 1 transfer frame per receive call, we set desired link credit again
148145
# this will send a flow of 1
149146
link.flow(link_credit=100)
150-
assert link.current_link_credit == 1
151-
assert link.total_link_credit == 100
152-
147+
assert link.current_link_credit == 100
153148
link._incoming_transfer(transfer_frame_two)
154-
assert link.current_link_credit == 0
155-
assert link.total_link_credit == 99
149+
assert link.current_link_credit == 99
156150
link._incoming_transfer(transfer_frame_three)
157-
assert link.current_link_credit == -1
158-
assert link.total_link_credit == 98
151+
assert link.current_link_credit == 98
159152

160153
@pytest.mark.parametrize(
161154
"frame",

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_client_async.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ async def _keep_alive_async(self):
147147
elapsed_time = current_time - start_time
148148
if elapsed_time >= self._keep_alive_interval:
149149
await asyncio.shield(
150-
self._connection.listen(wait=self._socket_timeout, batch=self._link.total_link_credit)
150+
self._connection.listen(wait=self._socket_timeout, batch=self._link.current_link_credit)
151151
)
152152
start_time = current_time
153153
await asyncio.sleep(1)
@@ -733,7 +733,7 @@ async def _client_run_async(self, **kwargs):
733733
:rtype: bool
734734
"""
735735
try:
736-
if self._link.total_link_credit <= 0:
736+
if self._link.current_link_credit<= 0:
737737
await self._link.flow(link_credit=self._link_credit)
738738
await self._connection.listen(wait=self._socket_timeout, **kwargs)
739739
except ValueError:

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_link_async.py

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ def __init__(
9696
self._on_link_state_change = kwargs.get("on_link_state_change")
9797
self._on_attach = kwargs.get("on_attach")
9898
self._error: Optional[AMQPLinkError] = None
99-
self.total_link_credit = self.link_credit
10099

101100
async def __aenter__(self) -> "Link":
102101
await self.attach()
@@ -276,19 +275,6 @@ async def detach(self, close: bool = False, error: Optional[AMQPError] = None) -
276275
await self._set_state(LinkState.DETACHED)
277276

278277
async def flow(self, *, link_credit: Optional[int] = None, **kwargs) -> None:
279-
# Given the desired link credit `link_credit`, the link credit sent via
280-
# FlowFrame is calculated as follows: The link credit to flow on the wire
281-
# `self.current_link_credit` is the desired link credit `link_credit`
282-
# minus the current link credit on the wire `self.total_link_credit`.
283-
self.current_link_credit = link_credit - self.total_link_credit if link_credit is not None else self.link_credit
284-
285-
# If the link credit to flow is greater than 0 (i.e the desired link credit
286-
# is greater than the current link credit on the wire), then we will send a
287-
# flow to issue more link credit. Otherwise link credit on the wire is sufficient.
288-
if self.current_link_credit > 0:
289-
# Calculate the total link credit on the wire, by adding the credit
290-
# we will flow to the total link credit.
291-
self.total_link_credit = (
292-
self.current_link_credit + self.total_link_credit if link_credit is not None else self.link_credit
293-
)
294-
await self._outgoing_flow(**kwargs)
278+
# Reset link credit to the default and flow
279+
self.current_link_credit = link_credit if link_credit is not None else self.link_credit
280+
await self._outgoing_flow(**kwargs)

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/aio/_receiver_async.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ async def _incoming_transfer(self, frame):
6464
if not frame[5]:
6565
self.delivery_count += 1
6666
self.current_link_credit -= 1
67-
self.total_link_credit -= 1
6867
if self.received_delivery_id is not None:
6968
self._first_frame = frame
7069
if not self.received_delivery_id and not self._received_payload:

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ def _keep_alive(self):
237237
current_time = time.time()
238238
elapsed_time = current_time - start_time
239239
if elapsed_time >= self._keep_alive_interval:
240-
self._connection.listen(wait=self._socket_timeout, batch=self._link.total_link_credit)
240+
self._connection.listen(wait=self._socket_timeout, batch=self._link.current_link_credit)
241241
start_time = current_time
242242
time.sleep(1)
243243
except Exception as e: # pylint: disable=broad-except
@@ -852,7 +852,7 @@ def _client_run(self, **kwargs):
852852
:rtype: bool
853853
"""
854854
try:
855-
if self._link.total_link_credit <= 0:
855+
if self._link.current_link_credit <= 0:
856856
self._link.flow(link_credit=self._link_credit)
857857
self._connection.listen(wait=self._socket_timeout, **kwargs)
858858
except ValueError:

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/link.py

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@ def __init__(
9292
self._on_link_state_change = kwargs.get("on_link_state_change")
9393
self._on_attach = kwargs.get("on_attach")
9494
self._error: Optional[AMQPLinkError] = None
95-
self.total_link_credit = self.link_credit
9695

9796
def __enter__(self) -> "Link":
9897
self.attach()
@@ -273,18 +272,5 @@ def detach(self, close: bool = False, error: Optional[AMQPError] = None) -> None
273272
self._set_state(LinkState.DETACHED)
274273

275274
def flow(self, *, link_credit: Optional[int] = None, **kwargs: Any) -> None:
276-
# Given the desired link credit `link_credit`, the link credit sent via
277-
# FlowFrame is calculated as follows: The link credit to flow on the wire
278-
# `self.current_link_credit` is the desired link credit
279-
# `link_credit` minus the current link credit on the wire `self.total_link_credit`.
280-
self.current_link_credit = link_credit - self.total_link_credit if link_credit is not None else self.link_credit
281-
282-
# If the link credit to flow is greater than 0 (i.e the desired link credit is greater than
283-
# the current link credit on the wire), then we will send a flow to issue more link credit.
284-
# Otherwise link credit on the wire is sufficient.
285-
if self.current_link_credit > 0:
286-
# Calculate the total link credit on the wire, by adding the credit we will flow to the total link credit.
287-
self.total_link_credit = (
288-
self.current_link_credit + self.total_link_credit if link_credit is not None else self.link_credit
289-
)
290-
self._outgoing_flow(**kwargs)
275+
self.current_link_credit = link_credit if link_credit is not None else self.link_credit
276+
self._outgoing_flow(**kwargs)

sdk/servicebus/azure-servicebus/azure/servicebus/_pyamqp/receiver.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ def _incoming_transfer(self, frame):
6060
# If more is false --> this is the last frame of the message
6161
if not frame[5]:
6262
self.current_link_credit -= 1
63-
self.total_link_credit -= 1
6463
self.delivery_count += 1
6564
self.received_delivery_id = frame[1] # delivery_id
6665
if self.received_delivery_id is not None:

0 commit comments

Comments
 (0)