Skip to content

Commit 53d4000

Browse files
committed
Do not block for broadcast and multicast requests
1 parent 6b8a6d8 commit 53d4000

File tree

4 files changed

+127
-34
lines changed

4 files changed

+127
-34
lines changed

tests/api/test_request.py

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ async def test_callback_rsp_cleanup_timeout_external(connected_znp):
9292
assert not znp._listeners
9393

9494

95-
async def test_callback_rsp_cleanup_timeout_internal(connected_znp):
95+
@pytest.mark.parametrize("background", [False, True])
96+
async def test_callback_rsp_cleanup_timeout_internal(background, connected_znp):
9697
znp, znp_server = connected_znp
9798
znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_SREQ_TIMEOUT] = 0.1
9899
znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_ARSP_TIMEOUT] = 0.1
@@ -104,6 +105,45 @@ async def test_callback_rsp_cleanup_timeout_internal(connected_znp):
104105
await znp.request_callback_rsp(
105106
request=c.Util.TimeAlive.Req(),
106107
callback=c.SYS.ResetInd.Callback(partial=True),
108+
background=background,
109+
)
110+
111+
# We should be cleaned up
112+
assert not znp._listeners
113+
114+
115+
async def test_callback_rsp_cleanup_background_error(connected_znp):
116+
znp, znp_server = connected_znp
117+
znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_SREQ_TIMEOUT] = 0.1
118+
znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_ARSP_TIMEOUT] = 0.1
119+
120+
assert not znp._listeners
121+
122+
# This request will timeout because we didn't send anything back
123+
with pytest.raises(asyncio.TimeoutError):
124+
await znp.request_callback_rsp(
125+
request=c.Util.TimeAlive.Req(),
126+
callback=c.SYS.ResetInd.Callback(partial=True),
127+
background=True,
128+
)
129+
130+
# We should be cleaned up
131+
assert not znp._listeners
132+
133+
134+
async def test_callback_rsp_cleanup_background_timeout(connected_znp):
135+
znp, znp_server = connected_znp
136+
znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_SREQ_TIMEOUT] = 0.1
137+
znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_ARSP_TIMEOUT] = 0.1
138+
139+
assert not znp._listeners
140+
141+
# This request will timeout because we didn't send anything back
142+
with pytest.raises(asyncio.TimeoutError):
143+
await znp.request_callback_rsp(
144+
request=c.Util.TimeAlive.Req(),
145+
callback=c.SYS.ResetInd.Callback(partial=True),
146+
background=True,
107147
)
108148

109149
# We should be cleaned up

tests/application/test_requests.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,39 @@ async def test_mrequest(device, make_application, mocker):
267267
await app.shutdown()
268268

269269

270+
@pytest.mark.parametrize("device", [FormedLaunchpadCC26X2R1])
271+
async def test_mrequest_doesnt_block(device, make_application, event_loop):
272+
app, znp_server = make_application(server_cls=device)
273+
274+
znp_server.reply_once_to(
275+
request=c.AF.DataRequestExt.Req(
276+
DstAddrModeAddress=t.AddrModeAddress(mode=t.AddrMode.Group, address=0x1234),
277+
ClusterId=0x0006,
278+
partial=True,
279+
),
280+
responses=[
281+
# Confirm the request immediately but do not send a callback response until
282+
# *after* the group request is "done".
283+
c.AF.DataRequestExt.Rsp(Status=t.Status.SUCCESS),
284+
],
285+
)
286+
287+
data_confirm_rsp = c.AF.DataConfirm.Callback(
288+
Status=t.Status.SUCCESS, Endpoint=1, TSN=2
289+
)
290+
291+
request_sent = event_loop.create_future()
292+
request_sent.add_done_callback(lambda _: znp_server.send(data_confirm_rsp))
293+
294+
await app.startup(auto_form=False)
295+
296+
group = app.groups.add_group(0x1234, "test group")
297+
await group.endpoint.on_off.on()
298+
request_sent.set_result(True)
299+
300+
await app.shutdown()
301+
302+
270303
@pytest.mark.parametrize("device", [FormedLaunchpadCC26X2R1])
271304
async def test_unimplemented_zdo_converter(device, make_application, mocker):
272305
app, znp_server = make_application(server_cls=device)

zigpy_znp/api.py

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
import typing
23
import asyncio
34
import logging
@@ -398,26 +399,13 @@ async def capture_responses(self, responses):
398399
"""
399400

400401
queue = asyncio.Queue()
401-
listener = self.callback_for_responses(responses, queue.put_nowait)
402+
listener = self.callback_for_responses(responses, callback=queue.put_nowait)
402403

403404
try:
404405
yield queue
405406
finally:
406407
self.remove_listener(listener)
407408

408-
@contextlib.asynccontextmanager
409-
async def capture_responses_once(self, responses):
410-
"""
411-
Captures all matched responses in a queue within the context manager.
412-
"""
413-
414-
future, listener = self.wait_for_responses(responses, context=True)
415-
416-
try:
417-
yield future
418-
finally:
419-
self.remove_listener(listener)
420-
421409
def callback_for_responses(self, responses, callback) -> CallbackResponseListener:
422410
"""
423411
Creates a callback listener that matches any of the provided responses.
@@ -546,7 +534,7 @@ async def request(self, request: t.CommandBase, **response_params) -> t.CommandB
546534
return response
547535

548536
async def request_callback_rsp(
549-
self, *, request, callback, timeout=None, **response_params
537+
self, *, request, callback, timeout=None, background=False, **response_params
550538
):
551539
"""
552540
Sends an SREQ, gets its SRSP confirmation, and waits for its real AREQ response.
@@ -559,12 +547,41 @@ async def request_callback_rsp(
559547
from the UART and be handled in the same event loop step by ZNP.
560548
"""
561549

550+
# Every request should have a timeout to prevent deadlocks
562551
if timeout is None:
563552
timeout = self._config[conf.CONF_ZNP_CONFIG][conf.CONF_ARSP_TIMEOUT]
564553

565-
# The async context manager allows us to clean up resources upon cancellation
566-
async with self.capture_responses_once([callback]) as callback_rsp:
567-
await self.request(request, **response_params)
554+
callback_rsp, listener = self.wait_for_responses([callback], context=True)
555+
556+
if not background:
557+
try:
558+
async with async_timeout.timeout(timeout):
559+
await self.request(request, **response_params)
560+
561+
return await callback_rsp
562+
finally:
563+
self.remove_listener(listener)
564+
565+
start_time = time.time()
568566

567+
# If the SREQ/SRSP pair fails, we must cancel the AREQ listener
568+
try:
569569
async with async_timeout.timeout(timeout):
570-
return await callback_rsp
570+
request_rsp = await self.request(request, **response_params)
571+
except Exception:
572+
self.remove_listener(listener)
573+
raise
574+
575+
async def callback_handler(timeout):
576+
try:
577+
async with async_timeout.timeout(timeout):
578+
await callback_rsp
579+
finally:
580+
self.remove_listener(listener)
581+
582+
# If it succeeds, create a background task to receive the AREQ but take into
583+
# account the time it took to start the SREQ to ensure we do not grossly exceed
584+
# the timeout
585+
asyncio.create_task(callback_handler(time.time() - start_time))
586+
587+
return request_rsp

zigpy_znp/zigbee/application.py

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -31,16 +31,19 @@
3131

3232
ZDO_ENDPOINT = 0
3333

34-
PROBE_TIMEOUT = 5 # seconds
35-
STARTUP_TIMEOUT = 5 # seconds
36-
ZDO_REQUEST_TIMEOUT = 15 # seconds
37-
DATA_CONFIRM_TIMEOUT = 8 # seconds
38-
DEVICE_JOIN_MAX_DELAY = 5 # seconds
39-
NETWORK_COMMISSIONING_TIMEOUT = 30 # seconds
40-
WATCHDOG_PERIOD = 30 # seconds
34+
# All of these are in seconds
35+
PROBE_TIMEOUT = 5
36+
STARTUP_TIMEOUT = 5
37+
ZDO_REQUEST_TIMEOUT = 15
38+
DATA_CONFIRM_TIMEOUT = 8
39+
DEVICE_JOIN_MAX_DELAY = 5
40+
NETWORK_COMMISSIONING_TIMEOUT = 30
41+
WATCHDOG_PERIOD = 30
42+
BROADCAST_SEND_WAIT_DURATION = 3
43+
MULTICAST_SEND_WAIT_DURATION = 3
4144

4245
REQUEST_MAX_RETRIES = 5
43-
REQUEST_ERROR_RETRY_DELAY = 0.5 # second
46+
REQUEST_ERROR_RETRY_DELAY = 0.5 # seconds
4447

4548
# Errors that go away on their own after waiting for a bit
4649
REQUEST_TRANSIENT_ERRORS = {
@@ -1353,17 +1356,14 @@ async def _send_request_raw(
13531356
)
13541357

13551358
if dst_addr.mode == t.AddrMode.Broadcast:
1356-
# Broadcasts will not receive a confirmation but they still take time
1357-
# and use up concurrency slots
1359+
# Broadcasts will not receive a confirmation
13581360
response = await self._znp.request(
13591361
request=request, RspStatus=t.Status.SUCCESS
13601362
)
1361-
1362-
await asyncio.sleep(0.1 * 30)
13631363
else:
13641364
async with async_timeout.timeout(DATA_CONFIRM_TIMEOUT):
1365-
# Shield from cancellation to prevent requests that time out
1366-
# in higher layers from missing expected responses
1365+
# Shield from cancellation to prevent requests that time out in higher
1366+
# layers from missing expected responses
13671367
response = await asyncio.shield(
13681368
self._znp.request_callback_rsp(
13691369
request=request,
@@ -1374,6 +1374,9 @@ async def _send_request_raw(
13741374
# XXX: can this ever not match?
13751375
# Endpoint=src_ep,
13761376
),
1377+
# Multicasts eventually receive a confirmation but waiting for
1378+
# it is unnecessary
1379+
background=(dst_addr.mode == t.AddrMode.Group),
13771380
)
13781381
)
13791382

0 commit comments

Comments
 (0)