Skip to content

Properly handle older Z-Stack SYS.SetTxPower responses #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 36 additions & 8 deletions tests/application/test_startup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import zigpy_znp.config as conf
import zigpy_znp.commands as c
from zigpy_znp.api import ZNP
from zigpy_znp.exceptions import InvalidCommandResponse
from zigpy_znp.types.nvids import ExNvIds, OsalNvIds

from ..conftest import (
Expand Down Expand Up @@ -163,19 +164,46 @@ async def test_write_nvram(device, make_application, mocker):


@pytest.mark.parametrize("device", FORMED_DEVICES)
async def test_tx_power(device, make_application):
@pytest.mark.parametrize("succeed", [True, False])
async def test_tx_power(device, succeed, make_application):
app, znp_server = make_application(
server_cls=device,
client_config={conf.CONF_ZNP_CONFIG: {conf.CONF_TX_POWER: 19}},
)

set_tx_power = znp_server.reply_once_to(
request=c.SYS.SetTxPower.Req(TXPower=19),
responses=[c.SYS.SetTxPower.Rsp(Status=t.Status.SUCCESS)],
)

await app.startup(auto_form=False)
await set_tx_power
if device.version == 3.30:
if succeed:
set_tx_power = znp_server.reply_once_to(
request=c.SYS.SetTxPower.Req(TXPower=19),
responses=[c.SYS.SetTxPower.Rsp(StatusOrPower=t.Status.SUCCESS)],
)
else:
set_tx_power = znp_server.reply_once_to(
request=c.SYS.SetTxPower.Req(TXPower=19),
responses=[
c.SYS.SetTxPower.Rsp(StatusOrPower=t.Status.INVALID_PARAMETER)
],
)
else:
if succeed:
set_tx_power = znp_server.reply_once_to(
request=c.SYS.SetTxPower.Req(TXPower=19),
responses=[c.SYS.SetTxPower.Rsp(StatusOrPower=19)],
)
else:
set_tx_power = znp_server.reply_once_to(
request=c.SYS.SetTxPower.Req(TXPower=19),
responses=[c.SYS.SetTxPower.Rsp(StatusOrPower=-1)], # adjusted
)

if device.version == 3.30 and not succeed:
with pytest.raises(InvalidCommandResponse):
await app.startup(auto_form=False)

await set_tx_power
else:
await app.startup(auto_form=False)
await set_tx_power

await app.shutdown()

Expand Down
9 changes: 5 additions & 4 deletions zigpy_znp/commands/sys.py
Original file line number Diff line number Diff line change
Expand Up @@ -418,10 +418,11 @@ class SYS(t.CommandsBase, subsystem=t.Subsystem.SYS):
t.CommandType.SREQ,
0x14,
req_schema=(t.Param("TXPower", t.int8s, "Requested TX power setting, in dBm"),),
# While the docs say "the returned TX power is the actual setting applied to
# the radio - nearest characterized value for the specific radio.", the code
# matches the documentation.
rsp_schema=t.STATUS_SCHEMA,
# XXX: Z-Stack 3.30+ returns SUCCESS or INVALID_PARAMETER.
# Z-Stack 1.2 and 3.0 return the cloest TX power setting.
rsp_schema=(
t.Param("StatusOrPower", t.int8s, "Status code or applied power setting"),
),
)

# initialize the statistics table in NV memory
Expand Down
2 changes: 1 addition & 1 deletion zigpy_znp/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def validator(v):
vol.Optional(CONF_ZNP_CONFIG, default={}): vol.Schema(
{
vol.Optional(CONF_TX_POWER, default=None): vol.Any(
None, vol.All(int, vol.Range(min=-22, max=19))
None, vol.All(int, vol.Range(min=-22, max=22))
),
vol.Optional(CONF_SREQ_TIMEOUT, default=15): VolPositiveNumber,
vol.Optional(CONF_ARSP_TIMEOUT, default=30): VolPositiveNumber,
Expand Down
25 changes: 20 additions & 5 deletions zigpy_znp/zigbee/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,11 +254,7 @@ async def _startup(self, auto_form=False, force_form=False, read_only=False):
# At this point the device state should the same, regardless of whether we just
# formed a new network or are restoring one
if self.znp_config[conf.CONF_TX_POWER] is not None:
dbm = self.znp_config[conf.CONF_TX_POWER]

await self._znp.request(
c.SYS.SetTxPower.Req(TXPower=dbm), RspStatus=t.Status.SUCCESS
)
await self.set_tx_power(dbm=self.znp_config[conf.CONF_TX_POWER])

# Both versions of Z-Stack use this callback
started_as_coordinator = self._znp.wait_for_response(
Expand Down Expand Up @@ -388,6 +384,25 @@ async def update_pan_id(self, pan_id: t.uint16_t) -> None:
await self._znp.nvram.osal_write(OsalNvIds.NIB, nib)
await self._znp.nvram.osal_write(OsalNvIds.PANID, pan_id)

async def set_tx_power(self, dbm: int) -> None:
"""
Sets the radio TX power.
"""

rsp = await self._znp.request(c.SYS.SetTxPower.Req(TXPower=dbm))

if self._znp.version >= 3.30 and rsp.StatusOrPower != t.Status.SUCCESS:
# Z-Stack 3's response indicates success or failure
raise InvalidCommandResponse(
f"Failed to set TX power: {t.Status(rsp.StatusOrPower)!r}", rsp
)
elif self._znp.version < 3.30 and rsp.StatusOrPower != dbm:
# Old Z-Stack releases used the response status field to indicate the power
# setting that was actually applied
LOGGER.warning(
"Requested TX power %d was adjusted to %d", dbm, rsp.StatusOrPower
)

async def form_network(self):
"""
Clears the current config and forms a new network with a random network key,
Expand Down