Skip to content

Commit ee33ca7

Browse files
committed
Test backup and restore tools
1 parent 2f8501d commit ee33ca7

File tree

3 files changed

+283
-7
lines changed

3 files changed

+283
-7
lines changed

tests/test_tools.py

Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
import copy
2+
import json
3+
4+
import pytest
5+
6+
import zigpy_znp.types as t
7+
import zigpy_znp.commands as c
8+
9+
from zigpy_znp.types.nvids import NwkNvIds, OsalExNvIds
10+
11+
from zigpy_znp.tools.backup import main as backup
12+
from zigpy_znp.tools.restore import main as restore
13+
14+
15+
from test_api import pytest_mark_asyncio_timeout, pingable_serial_port # noqa: F401
16+
17+
from test_application import znp_server # noqa: F401
18+
19+
20+
# We use an existing backup as an NVRAM model
21+
REAL_BACKUP = {
22+
"osal": {
23+
"ADDRMGR": "998877665544332211223344",
24+
"BINDING_TABLE": "ffffffffffffffffffffffffffff",
25+
"DEVICE_LIST": "a964000001080000ff300020003c0000",
26+
"TCLK_TABLE": "00000000000000000000000000000000ff000000",
27+
"APS_KEY_DATA_TABLE": "000000000000000000000000000000000000000000000000",
28+
"NWK_SEC_MATERIAL_TABLE": "000000000000000000000000",
29+
},
30+
"nwk": {
31+
"HAS_CONFIGURED_ZSTACK3": "55",
32+
"EXTADDR": "5cacaa1c004b1200",
33+
"STARTUP_OPTION": "00",
34+
"START_DELAY": "0a",
35+
"NIB": (
36+
"0c0502331433001e0000000105018f00070002051e0000001900000000000000000000"
37+
"0095860800008010020f0f040001000000010000000099887766554433220100000000"
38+
"00000000000000000000000000000000000000000000000000000000000000000f0300"
39+
"01780a0100000089460000"
40+
),
41+
"POLL_RATE_OLD16": "b80b",
42+
"POLL_RATE": (
43+
"b80b0000b8010000640000006400000000000000e80300000000000000000000"
44+
"e859002001000000"
45+
),
46+
"DATA_RETRIES": "02",
47+
"POLL_FAILURE_RETRIES": "01",
48+
"STACK_PROFILE": "02",
49+
"INDIRECT_MSG_TIMEOUT": "07",
50+
"ROUTE_EXPIRY_TIME": "1e",
51+
"EXTENDED_PAN_ID": "9988776655443322",
52+
"BCAST_RETRIES": "02",
53+
"PASSIVE_ACK_TIMEOUT": "05",
54+
"BCAST_DELIVERY_TIME": "1e",
55+
"CONCENTRATOR_ENABLE": "01",
56+
"CONCENTRATOR_DISCOVERY": "78",
57+
"CONCENTRATOR_RADIUS_OLD16": "0a",
58+
"CONCENTRATOR_RC": "01",
59+
"NWK_MGR_MODE": "01",
60+
"SRC_RTG_EXPIRY_TIME": "ff",
61+
"ROUTE_DISCOVERY_TIME": "05",
62+
"NWK_ACTIVE_KEY_INFO": "0011223344556677889911223344556677",
63+
"NWK_ALTERN_KEY_INFO": "0011223344556677889911223344556677",
64+
"ROUTER_OFF_ASSOC_CLEANUP": "00",
65+
"NWK_LEAVE_REQ_ALLOWED": "01",
66+
"NWK_CHILD_AGE_ENABLE": "00",
67+
"GROUP_TABLE": "0000ffffffffffffffffffffffffffffffffffffffff",
68+
"APS_FRAME_RETRIES": "03",
69+
"APS_ACK_WAIT_DURATION": "b80b",
70+
"APS_ACK_WAIT_MULTIPLIER": "02",
71+
"BINDING_TIME": "803e",
72+
"APS_USE_EXT_PANID": "0000000000000000",
73+
"APS_USE_INSECURE_JOIN": "01",
74+
"COMMISSIONED_NWK_ADDR": "feff",
75+
"APS_NONMEMBER_RADIUS": "02",
76+
"APS_LINK_KEY_TABLE": "0000000000000000000000000000000000000000",
77+
"APS_DUPREJ_TIMEOUT_INC": "e803",
78+
"APS_DUPREJ_TIMEOUT_COUNT": "0a",
79+
"APS_DUPREJ_TABLE_SIZE": "0500",
80+
"NWK_PARENT_INFO": "01",
81+
"NWK_ENDDEV_TIMEOUT_DEF": "08",
82+
"END_DEV_TIMEOUT_VALUE": "08",
83+
"END_DEV_CONFIGURATION": "00",
84+
"BDBNODEISONANETWORK": "01",
85+
"PRECFGKEY": "a1a2a3a4a5a6a7a8a1a2a3a4a5a6a7a8",
86+
"PRECFGKEYS_ENABLE": "01",
87+
"SECURE_PERMIT_JOIN": "01",
88+
"APS_LINK_KEY_TYPE": "01",
89+
"APS_ALLOW_R19_SECURITY": "00",
90+
"USE_DEFAULT_TCLK": "01",
91+
"TRUSTCENTER_ADDR": "ffffffffffffffff",
92+
"USERDESC": "0000000000000000000000000000000000",
93+
"NWKKEY": "0011223344556677889911223344556677df010006930400",
94+
"PANID": "9586",
95+
"CHANLIST": "00801002",
96+
"LEAVE_CTRL": "00",
97+
"SCAN_DURATION": "04",
98+
"LOGICAL_TYPE": "00",
99+
"NWKMGR_MIN_TX": "14",
100+
"ZDO_DIRECT_CB": "01",
101+
"SAPI_ENDPOINT": "e0",
102+
"TCLK_SEED": "a8a1a2a3a4a5a6a7a8a1a2a3a4a5a6a7",
103+
},
104+
}
105+
106+
107+
@pytest.fixture
108+
def openable_serial_znp_server(mocker, znp_server): # noqa: F811
109+
# The fake serial port is "opened" by argparse, which we have to allow
110+
def fixed_open(
111+
file,
112+
mode="r",
113+
buffering=-1,
114+
encoding=None,
115+
errors=None,
116+
newline=None,
117+
closefd=True,
118+
opener=None,
119+
):
120+
if file == znp_server._port_path:
121+
122+
class FakeFile:
123+
name = file
124+
125+
def close(self):
126+
pass
127+
128+
return FakeFile()
129+
130+
return open(file, mode, buffering, encoding, errors, newline, closefd, opener)
131+
132+
mocker.patch("argparse.open", new=fixed_open)
133+
134+
return znp_server
135+
136+
137+
@pytest_mark_asyncio_timeout(seconds=5)
138+
async def test_backup(openable_serial_znp_server, tmp_path):
139+
def osal_nv_read(req):
140+
nvid = NwkNvIds(req.Id).name
141+
142+
if nvid not in REAL_BACKUP["nwk"]:
143+
return c.SYS.OSALNVRead.Rsp(Status=t.Status.INVALID_PARAMETER, Value=b"")
144+
145+
value = bytes.fromhex(REAL_BACKUP["nwk"][nvid])
146+
147+
return c.SYS.OSALNVRead.Rsp(Status=t.Status.SUCCESS, Value=value[req.Offset :])
148+
149+
def nv_length(req):
150+
nvid = OsalExNvIds(req.ItemId).name
151+
152+
if nvid not in REAL_BACKUP["osal"]:
153+
return c.SYS.NVLength.Rsp(Length=0)
154+
155+
value = bytes.fromhex(REAL_BACKUP["osal"][nvid])
156+
157+
return c.SYS.NVLength.Rsp(Length=len(value))
158+
159+
def nv_read(req):
160+
nvid = OsalExNvIds(req.ItemId).name
161+
value = bytes.fromhex(REAL_BACKUP["osal"][nvid])
162+
163+
return c.SYS.NVRead.Rsp(
164+
Status=t.Status.SUCCESS, Value=value[req.Offset :][: req.Length]
165+
)
166+
167+
openable_serial_znp_server.reply_to(
168+
request=c.SYS.OSALNVRead.Req(partial=True), responses=[osal_nv_read],
169+
)
170+
171+
openable_serial_znp_server.reply_to(
172+
request=c.SYS.NVLength.Req(SysId=1, SubId=0, partial=True),
173+
responses=[nv_length],
174+
)
175+
176+
openable_serial_znp_server.reply_to(
177+
request=c.SYS.NVRead.Req(SysId=1, SubId=0, partial=True), responses=[nv_read],
178+
)
179+
180+
backup_file = tmp_path / "backup.json"
181+
await backup([openable_serial_znp_server._port_path, "-o", str(backup_file)])
182+
183+
# The backup JSON written to disk should be an exact copy of our fake NVRAM
184+
assert json.loads(backup_file.read_text()) == REAL_BACKUP
185+
186+
187+
@pytest_mark_asyncio_timeout(seconds=5)
188+
async def test_restore(openable_serial_znp_server, tmp_path):
189+
simulated_nvram = {"osal": {}, "nwk": {}}
190+
191+
def osal_nv_item_init(req):
192+
nvid = NwkNvIds(req.Id)
193+
194+
# We have one special value fail
195+
if nvid == NwkNvIds.SAS_TC_ADDR:
196+
return c.SYS.OSALNVItemInit.Rsp(Status=t.Status.NV_OPER_FAILED)
197+
198+
assert len(req.Value) == req.ItemLen
199+
200+
simulated_nvram["nwk"][nvid.name] = bytearray(req.Value)
201+
202+
return c.SYS.OSALNVItemInit.Rsp(Status=t.Status.SUCCESS)
203+
204+
def osal_nv_write(req):
205+
nvid = NwkNvIds(req.Id)
206+
207+
# We have one special value fail
208+
if nvid == NwkNvIds.SAS_TC_ADDR:
209+
return c.SYS.OSALNVWrite.Rsp(Status=t.Status.NV_OPER_FAILED)
210+
211+
assert nvid.name in simulated_nvram["nwk"]
212+
assert len(req.Value) + req.Offset <= len(simulated_nvram["nwk"][nvid.name])
213+
214+
simulated_nvram["nwk"][nvid.name][
215+
req.Offset : req.Offset + len(req.Value)
216+
] = req.Value
217+
218+
return c.SYS.OSALNVWrite.Rsp(Status=t.Status.SUCCESS)
219+
220+
def nv_write(req):
221+
nvid = OsalExNvIds(req.ItemId)
222+
223+
# We have one special value fail
224+
if nvid == OsalExNvIds.TCLK_IC_TABLE:
225+
return c.SYS.NVWrite.Rsp(Status=t.Status.NV_OPER_FAILED)
226+
227+
assert req.Offset == 0
228+
229+
simulated_nvram["osal"][nvid.name] = req.Value
230+
231+
return c.SYS.NVWrite.Rsp(Status=t.Status.SUCCESS)
232+
233+
openable_serial_znp_server.reply_to(
234+
request=c.SYS.OSALNVItemInit.Req(partial=True), responses=[osal_nv_item_init],
235+
)
236+
237+
openable_serial_znp_server.reply_to(
238+
request=c.SYS.OSALNVWrite.Req(partial=True), responses=[osal_nv_write],
239+
)
240+
241+
openable_serial_znp_server.reply_to(
242+
request=c.SYS.NVWrite.Req(SysId=1, SubId=0, partial=True), responses=[nv_write],
243+
)
244+
245+
openable_serial_znp_server.reply_to(
246+
request=c.SYS.ResetReq.Req(Type=t.ResetType.Soft),
247+
responses=[
248+
c.SYS.ResetInd.Callback(
249+
Reason=t.ResetReason.PowerUp,
250+
TransportRev=2,
251+
ProductId=1,
252+
MajorRel=2,
253+
MinorRel=7,
254+
MaintRel=1,
255+
)
256+
],
257+
)
258+
259+
# These NVIDs don't exist
260+
backup_obj = copy.deepcopy(REAL_BACKUP)
261+
backup_obj["osal"]["TCLK_IC_TABLE"] = "00"
262+
backup_obj["nwk"]["SAS_TC_ADDR"] = "00"
263+
264+
backup_file = tmp_path / "backup.json"
265+
backup_file.write_text(json.dumps(backup_obj))
266+
267+
await restore([openable_serial_znp_server._port_path, "-i", str(backup_file)])
268+
269+
# Convert every value to its hex representation to match the backup format
270+
simulated_nvram_hex = {
271+
cls: {k: v.hex() for k, v in obj.items()}
272+
for cls, obj in simulated_nvram.items()
273+
}
274+
275+
# The backup JSON written to disk should be an exact copy of our original fake NVRAM
276+
assert simulated_nvram_hex == REAL_BACKUP

zigpy_znp/tools/backup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,4 @@ async def main(argv):
8282

8383

8484
if __name__ == "__main__":
85-
asyncio.run(main(sys.argv[1:]))
85+
asyncio.run(main(sys.argv[1:])) # pragma: no cover

zigpy_znp/tools/restore.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,12 @@ async def restore(radio_path, backup):
2929
value = bytes.fromhex(value)
3030

3131
# XXX: are any NVIDs not filled all the way?
32-
init_rsp = await znp.request(
33-
c.SYS.OSALNVItemInit.Req(Id=nvid, ItemLen=len(value), Value=value)
34-
)
35-
assert init_rsp.Status in (t.Status.SUCCESS, t.Status.NV_ITEM_UNINIT)
36-
3732
try:
33+
await znp.request(
34+
c.SYS.OSALNVItemInit.Req(Id=nvid, ItemLen=len(value), Value=value),
35+
RspStatus=t.Status.SUCCESS,
36+
)
37+
3838
await znp.nvram_write(nvid, value)
3939
except InvalidCommandResponse:
4040
LOGGER.warning("Write failed for %s = %s", nvid, value)
@@ -77,4 +77,4 @@ async def main(argv):
7777

7878

7979
if __name__ == "__main__":
80-
asyncio.run(main(sys.argv[1:]))
80+
asyncio.run(main(sys.argv[1:])) # pragma: no cover

0 commit comments

Comments
 (0)