Skip to content

Commit 27db841

Browse files
committed
Properly handle the extended timeout in backgrounded requests
1 parent 4ac608c commit 27db841

File tree

3 files changed

+61
-52
lines changed

3 files changed

+61
-52
lines changed

tests/api/test_request.py

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -112,43 +112,45 @@ async def test_callback_rsp_cleanup_timeout_internal(background, connected_znp):
112112
assert not znp._listeners
113113

114114

115-
async def test_callback_rsp_cleanup_background_error(connected_znp):
115+
async def test_callback_rsp_background_timeout(connected_znp, mocker):
116116
znp, znp_server = connected_znp
117117
znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_SREQ_TIMEOUT] = 0.1
118-
znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_ARSP_TIMEOUT] = 0.1
118+
znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_ARSP_TIMEOUT] = 1.0
119119

120-
assert not znp._listeners
120+
mocker.spy(znp, "_unhandled_command")
121121

122-
# This request will timeout because we didn't send anything back
123-
with pytest.raises(asyncio.TimeoutError):
124-
await znp.request_callback_rsp(
125-
request=c.UTIL.TimeAlive.Req(),
126-
callback=c.SYS.ResetInd.Callback(partial=True),
127-
background=True,
122+
async def replier(req):
123+
# SREQ reply works
124+
await asyncio.sleep(0.05)
125+
yield c.UTIL.TimeAlive.Rsp(Seconds=123)
126+
127+
# And the callback will arrive before the AREQ timeout
128+
await asyncio.sleep(0.9)
129+
yield c.SYS.ResetInd.Callback(
130+
Reason=t.ResetReason.PowerUp,
131+
TransportRev=0x00,
132+
ProductId=0x12,
133+
MajorRel=0x01,
134+
MinorRel=0x02,
135+
MaintRel=0x03,
128136
)
129137

130-
# We should be cleaned up
131-
assert not znp._listeners
132-
138+
reply = znp_server.reply_once_to(c.UTIL.TimeAlive.Req(), responses=replier)
133139

134-
async def test_callback_rsp_cleanup_background_timeout(connected_znp):
135-
znp, znp_server = connected_znp
136-
znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_SREQ_TIMEOUT] = 0.1
137-
znp._config[conf.CONF_ZNP_CONFIG][conf.CONF_ARSP_TIMEOUT] = 0.1
138-
139-
assert not znp._listeners
140+
await znp.request_callback_rsp(
141+
request=c.UTIL.TimeAlive.Req(),
142+
callback=c.SYS.ResetInd.Callback(partial=True),
143+
background=True,
144+
)
140145

141-
# This request will timeout because we didn't send anything back
142-
with pytest.raises(asyncio.TimeoutError):
143-
await znp.request_callback_rsp(
144-
request=c.UTIL.TimeAlive.Req(),
145-
callback=c.SYS.ResetInd.Callback(partial=True),
146-
background=True,
147-
)
146+
await reply
148147

149148
# We should be cleaned up
150149
assert not znp._listeners
151150

151+
# Command was properly handled
152+
assert len(znp._unhandled_command.mock_calls) == 0
153+
152154

153155
async def test_callback_rsp_cleanup_concurrent(connected_znp, event_loop, mocker):
154156
znp, znp_server = connected_znp

tests/conftest.py

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import json
22
import asyncio
3+
import inspect
34
import logging
45
import pathlib
56
import contextlib
@@ -263,49 +264,52 @@ class BaseServerZNP(ZNP):
263264
align_structs = False
264265
version = None
265266

266-
def _flatten_responses(self, request, responses):
267+
async def _flatten_responses(self, request, responses):
267268
if responses is None:
268269
return
269270
elif isinstance(responses, t.CommandBase):
270271
yield responses
272+
elif inspect.iscoroutinefunction(responses):
273+
async for rsp in responses(request):
274+
yield rsp
275+
elif inspect.isasyncgen(responses):
276+
async for rsp in responses:
277+
yield rsp
271278
elif callable(responses):
272-
yield from self._flatten_responses(request, responses(request))
279+
async for rsp in self._flatten_responses(request, responses(request)):
280+
yield rsp
273281
else:
274282
for response in responses:
275-
yield from self._flatten_responses(request, response)
283+
async for rsp in self._flatten_responses(request, response):
284+
yield rsp
285+
286+
async def _send_responses(self, request, responses):
287+
async for response in self._flatten_responses(request, responses):
288+
await asyncio.sleep(0.001)
289+
LOGGER.debug("Replying to %s with %s", request, response)
290+
self.send(response)
276291

277292
def reply_once_to(self, request, responses, *, override=False):
278293
if override:
279294
self._listeners[request.header].clear()
280295

281-
future = self.wait_for_response(request)
282-
called_future = asyncio.get_running_loop().create_future()
296+
request_future = self.wait_for_response(request)
283297

284298
async def replier():
285-
request = await future
286-
287-
for response in self._flatten_responses(request, responses):
288-
await asyncio.sleep(0.001)
289-
LOGGER.debug("Replying to %s with %s", request, response)
290-
self.send(response)
299+
request = await request_future
300+
await self._send_responses(request, responses)
291301

292-
called_future.set_result(request)
302+
return request
293303

294-
asyncio.create_task(replier())
295-
296-
return called_future
304+
return asyncio.create_task(replier())
297305

298306
def reply_to(self, request, responses, *, override=False):
299307
if override:
300308
self._listeners[request.header].clear()
301309

302310
async def callback(request):
303311
callback.call_count += 1
304-
305-
for response in self._flatten_responses(request, responses):
306-
await asyncio.sleep(0.001)
307-
LOGGER.debug("Replying to %s with %s", request, response)
308-
self.send(response)
312+
await self._send_responses(request, responses)
309313

310314
callback.call_count = 0
311315

zigpy_znp/api.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -815,6 +815,7 @@ async def request_callback_rsp(
815815

816816
callback_rsp, listener = self.wait_for_responses([callback], context=True)
817817

818+
# Typical request/response/callbacks are not backgrounded
818819
if not background:
819820
try:
820821
async with async_timeout.timeout(timeout):
@@ -824,26 +825,28 @@ async def request_callback_rsp(
824825
finally:
825826
self.remove_listener(listener)
826827

827-
start_time = time.time()
828+
# Backgrounded callback handlers need to respect the provided timeout
829+
start_time = time.monotonic()
828830

829-
# If the SREQ/SRSP pair fails, we must cancel the AREQ listener
830831
try:
831832
async with async_timeout.timeout(timeout):
832833
request_rsp = await self.request(request, **response_params)
833834
except Exception:
835+
# If the SREQ/SRSP pair fails, we must cancel the AREQ listener
834836
self.remove_listener(listener)
835837
raise
836838

837-
async def callback_handler(timeout):
839+
# If it succeeds, create a background task to receive the AREQ but take into
840+
# account the time it took to start the SREQ to ensure we do not grossly exceed
841+
# the timeout
842+
async def callback_catcher(timeout):
838843
try:
839844
async with async_timeout.timeout(timeout):
840845
await callback_rsp
841846
finally:
842847
self.remove_listener(listener)
843848

844-
# If it succeeds, create a background task to receive the AREQ but take into
845-
# account the time it took to start the SREQ to ensure we do not grossly exceed
846-
# the timeout
847-
asyncio.create_task(callback_handler(time.time() - start_time))
849+
callback_timeout = max(0, timeout - (time.monotonic() - start_time))
850+
asyncio.create_task(callback_catcher(callback_timeout))
848851

849852
return request_rsp

0 commit comments

Comments
 (0)