Skip to content

Keep track of what retry method actually works #216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,11 @@ exclude = ["tests", "tests.*"]

[project.optional-dependencies]
testing = [
"pytest>=7.1.2",
"pytest-asyncio>=0.19.0",
"pytest>=7.3.1",
"pytest-asyncio>=0.21.0",
"pytest-timeout>=2.1.0",
"pytest-mock>=3.8.2",
"pytest-cov>=3.0.0",
"coveralls",
"pytest-mock>=3.10.0",
"pytest-cov>=4.1.0",
]

[tool.setuptools-git-versioning]
Expand Down
7 changes: 7 additions & 0 deletions tests/application/test_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,9 @@ def set_route_discovered(req):
await was_route_discovered
await zdo_req

# 6 accounts for the loopback requests
assert sum(c.value for c in app.state.counters["Retry_NONE"].values()) == 6 + 1

await app.shutdown()


Expand Down Expand Up @@ -602,6 +605,9 @@ def set_route_discovered(req):
)

await was_route_discovered
assert (
sum(c.value for c in app.state.counters["Retry_RouteDiscovery"].values()) == 1
)

await app.shutdown()

Expand Down Expand Up @@ -666,6 +672,7 @@ def data_confirm_replier(req):
)

assert was_ieee_addr_used
assert sum(c.value for c in app.state.counters["Retry_IEEEAddress"].values()) == 1

await app.shutdown()

Expand Down
51 changes: 50 additions & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import gc
import sys
import json
import typing
import asyncio
import inspect
import logging
Expand All @@ -7,6 +10,7 @@

import pytest
import zigpy.types
import zigpy.config
import zigpy.device

try:
Expand Down Expand Up @@ -42,6 +46,46 @@ def pytest_collection_modifyitems(session, config, items):
item.add_marker(pytest.mark.filterwarnings("error::RuntimeWarning"))


@pytest.hookimpl(trylast=True)
def pytest_fixture_post_finalizer(fixturedef, request) -> None:
"""Called after fixture teardown"""
if fixturedef.argname != "event_loop":
return

policy = asyncio.get_event_loop_policy()
try:
loop = policy.get_event_loop()
except RuntimeError:
loop = None
if loop is not None:
# Cleanup code based on the implementation of asyncio.run()
try:
if not loop.is_closed():
asyncio.runners._cancel_all_tasks(loop) # type: ignore[attr-defined]
loop.run_until_complete(loop.shutdown_asyncgens())
if sys.version_info >= (3, 9):
loop.run_until_complete(loop.shutdown_default_executor())
finally:
loop.close()
new_loop = policy.new_event_loop() # Replace existing event loop
# Ensure subsequent calls to get_event_loop() succeed
policy.set_event_loop(new_loop)


@pytest.fixture
def event_loop(
request: pytest.FixtureRequest,
) -> typing.Iterator[asyncio.AbstractEventLoop]:
"""Create an instance of the default event loop for each test case."""
yield asyncio.get_event_loop_policy().new_event_loop()
# Call the garbage collector to trigger ResourceWarning's as soon
# as possible (these are triggered in various __del__ methods).
# Without this, resources opened in one test can fail other tests
# when the warning is generated.
gc.collect()
# Event loop cleanup handled by pytest_fixture_post_finalizer


class ForwardingSerialTransport:
"""
Serial transport that hooks directly into a protocol
Expand Down Expand Up @@ -86,7 +130,12 @@ def __repr__(self):


def config_for_port_path(path):
return conf.CONFIG_SCHEMA({conf.CONF_DEVICE: {conf.CONF_DEVICE_PATH: path}})
return conf.CONFIG_SCHEMA(
{
conf.CONF_DEVICE: {conf.CONF_DEVICE_PATH: path},
zigpy.config.CONF_NWK_BACKUP_ENABLED: False,
}
)


@pytest.fixture
Expand Down
95 changes: 56 additions & 39 deletions zigpy_znp/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@
LOGGER = logging.getLogger(__name__)


class RetryMethod(t.enum_flag_uint8):
NONE = 0
AssocRemove = 2 << 0
RouteDiscovery = 2 << 1
LastGoodRoute = 2 << 2
IEEEAddress = 2 << 3


class ControllerApplication(zigpy.application.ControllerApplication):
SCHEMA = conf.CONFIG_SCHEMA
SCHEMA_DEVICE = conf.SCHEMA_DEVICE
Expand Down Expand Up @@ -809,19 +817,19 @@ def _find_endpoint(self, dst_ep: int, profile: int, cluster: int) -> int:

async def _send_request_raw(
self,
dst_addr,
dst_ep,
src_ep,
profile,
cluster,
sequence,
options,
radius,
data,
dst_addr: t.AddrModeAddress,
dst_ep: int,
src_ep: int,
profile: int,
cluster: int,
sequence: int,
options: c.af.TransmitOptions,
radius: int,
data: bytes,
*,
relays=None,
extended_timeout=False,
):
relays: list[int] | None = None,
extended_timeout: bool = False,
) -> None:
"""
Used by `request`/`mrequest`/`broadcast` to send a request.
Picks the correct request sending mechanism and fixes endpoint information.
Expand Down Expand Up @@ -922,9 +930,7 @@ async def _send_request_raw(

if dst_ep == ZDO_ENDPOINT or dst_addr.mode == t.AddrMode.Broadcast:
# Broadcasts and ZDO requests will not receive a confirmation
response = await self._znp.request(
request=request, RspStatus=t.Status.SUCCESS
)
await self._znp.request(request=request, RspStatus=t.Status.SUCCESS)
else:
async with async_timeout.timeout(
EXTENDED_DATA_CONFIRM_TIMEOUT
Expand Down Expand Up @@ -956,8 +962,6 @@ async def _send_request_raw(
response,
)

return response

@combine_concurrent_calls
async def _discover_route(self, nwk: t.NWK) -> None:
"""
Expand Down Expand Up @@ -1006,18 +1010,15 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:

dst_addr = t.AddrModeAddress.from_zigpy_type(packet.dst)

status = None
response = None
succeeded = False
association = None
force_relays = None

if packet.source_route is not None:
force_relays = packet.source_route

tried_assoc_remove = False
tried_route_discovery = False
tried_last_good_route = False
tried_ieee_address = False
retry_methods = RetryMethod.NONE
last_retry_method = RetryMethod.NONE

# Don't release the concurrency-limiting semaphore until we are done trying.
# There is no point in allowing requests to take turns getting buffer errors.
Expand Down Expand Up @@ -1047,7 +1048,7 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
if route_status.Status != c.zdo.RoutingStatus.SUCCESS:
await self._discover_route(dst_addr.address)

response = await self._send_request_raw(
await self._send_request_raw(
dst_addr=dst_addr,
dst_ep=packet.dst_ep,
src_ep=packet.src_ep,
Expand All @@ -1060,7 +1061,7 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
relays=force_relays,
extended_timeout=packet.extended_timeout,
)
status = response.Status
succeeded = True
break
except InvalidCommandResponse as e:
status = e.response.Status
Expand All @@ -1078,23 +1079,27 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
or dst_addr.mode not in (t.AddrMode.NWK, t.AddrMode.IEEE)
):
LOGGER.debug(
"Request failed (%s), retry attempt %s of %s",
"Request failed (%s), retry attempt %s of %s (%s)",
e,
attempt + 1,
REQUEST_MAX_RETRIES,
retry_methods.name,
)
await asyncio.sleep(3 * REQUEST_ERROR_RETRY_DELAY)
continue

# If we can't contact the device by forcing a specific route,
# there is not point in trying this more than once.
if tried_last_good_route and force_relays is not None:
# there is no point in trying this more than once.
if (
retry_methods & RetryMethod.LastGoodRoute
and force_relays is not None
):
force_relays = None

# If we fail to contact the device with its IEEE address, don't
# try again.
if (
tried_ieee_address
retry_methods & RetryMethod.IEEEAddress
and dst_addr.mode == t.AddrMode.IEEE
and device is not None
):
Expand All @@ -1111,7 +1116,7 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
status == t.Status.MAC_TRANSACTION_EXPIRED
and device is not None
and association is None
and not tried_assoc_remove
and not retry_methods & RetryMethod.AssocRemove
and self._znp.version >= 3.30
):
association = await self._znp.request(
Expand All @@ -1129,7 +1134,8 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
await self._znp.request(
c.UTIL.AssocRemove.Req(IEEE=device.ieee)
)
tried_assoc_remove = True
retry_methods |= RetryMethod.AssocRemove
last_retry_method = RetryMethod.AssocRemove

# Route discovery must be performed right after
await self._discover_route(device.nwk)
Expand All @@ -1138,39 +1144,46 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
"The UTIL.AssocRemove command is available only"
" in Z-Stack 3 releases built after 20201017"
)
elif not tried_last_good_route and device is not None:
elif (
not retry_methods & RetryMethod.LastGoodRoute
and device is not None
):
# `ZDO.SrcRtgInd` callbacks tell us the last path taken by
# messages from the device back to the coordinator. Sending
# packets backwards via this same route may work.
force_relays = (device.relays or [])[::-1]
tried_last_good_route = True
retry_methods |= RetryMethod.LastGoodRoute
last_retry_method = RetryMethod.LastGoodRoute
elif (
not tried_route_discovery
not retry_methods & RetryMethod.RouteDiscovery
and dst_addr.mode == t.AddrMode.NWK
):
# If that doesn't work, try re-discovering the route.
# While we can in theory poll and wait until it is fixed,
# letting the retry mechanism deal with it simpler.
await self._discover_route(dst_addr.address)
tried_route_discovery = True
retry_methods |= RetryMethod.RouteDiscovery
last_retry_method = RetryMethod.RouteDiscovery
elif (
not tried_ieee_address
not retry_methods & RetryMethod.IEEEAddress
and device is not None
and dst_addr.mode == t.AddrMode.NWK
):
# Try using the device's IEEE address instead of its NWK.
# If it works, the NWK will be updated when relays arrive.
tried_ieee_address = True
retry_methods |= RetryMethod.IEEEAddress
last_retry_method = RetryMethod.IEEEAddress
dst_addr = t.AddrModeAddress(
mode=t.AddrMode.IEEE,
address=device.ieee,
)

LOGGER.debug(
"Request failed (%s), retry attempt %s of %s",
"Request failed (%s), retry attempt %s of %s (%s)",
e,
attempt + 1,
REQUEST_MAX_RETRIES,
retry_methods.name,
)

# We've tried everything already so at this point just wait
Expand All @@ -1181,11 +1194,15 @@ async def send_packet(self, packet: zigpy.types.ZigbeePacket) -> None:
f" {status!r}",
status=status,
)

self.state.counters[f"Retry_{last_retry_method.name}"][
attempt
].increment()
finally:
# We *must* re-add the device association if we previously removed it but
# the request still failed. Otherwise, it may be a direct child and we will
# not be able to find it again.
if tried_assoc_remove and response is None:
if not succeeded and retry_methods & RetryMethod.AssocRemove:
await self._znp.request(
c.UTIL.AssocAdd.Req(
NWK=association.Device.shortAddr,
Expand Down