Skip to content

Commit 3497f79

Browse files
committed
Parse the entire NIB structure for both 3.0.1 and newer versions
1 parent cf1936b commit 3497f79

File tree

6 files changed

+382
-37
lines changed

6 files changed

+382
-37
lines changed

tests/test_nib.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import pytest
2+
3+
import zigpy_znp.types as t
4+
5+
from zigpy_znp.znp.nib import (
6+
NIB,
7+
OldNIB,
8+
parse_nib,
9+
NwkState8,
10+
NwkState16,
11+
PaddingByte,
12+
Empty,
13+
)
14+
15+
16+
NEW_NIB = bytes.fromhex(
17+
"""
18+
790502331433001e0000000105018f00070002051e000000190000000000000000000000958608000080
19+
10020f0f0400010000000100000000a860ca53db3bc0a801000000000000000000000000000000000000
20+
0000000000000000000000000000000000000f030001780a0100000020470000"""
21+
)
22+
23+
OLD_NIB = bytes.fromhex(
24+
"""
25+
5f0502101410001e0000000105018f070002051e0000140000000000000000000085d208000010020f0f
26+
05000100000001000000008533ce1c004b12000100000000000000000000000000000000000000000000
27+
00000000000000000000000000003c030001780a010000170000"""
28+
)
29+
30+
31+
def test_nwk_state():
32+
assert NwkState8._member_type_ == t.uint8_t
33+
assert NwkState16._member_type_ == t.uint16_t
34+
35+
# They should be otherwise identical
36+
assert NwkState8._value2member_map_ == NwkState16._value2member_map_
37+
38+
39+
def test_padding_byte():
40+
with pytest.raises(ValueError):
41+
PaddingByte.deserialize(b"")
42+
43+
with pytest.raises(ValueError):
44+
PaddingByte(b"ab")
45+
46+
with pytest.raises(ValueError):
47+
PaddingByte(b"")
48+
49+
assert PaddingByte.deserialize(b"abc") == (PaddingByte(b"a"), b"bc")
50+
assert PaddingByte.deserialize(b"a") == (PaddingByte(b"a"), b"")
51+
52+
53+
def test_empty():
54+
with pytest.raises(ValueError):
55+
Empty(b"a")
56+
57+
assert Empty.deserialize(b"abc") == (Empty(), b"abc")
58+
assert Empty.deserialize(b"") == (Empty(), b"")
59+
60+
61+
def test_nib_classes():
62+
old_nib_nwk_state = next(a for a in OldNIB.__attrs_attrs__ if a.type == NwkState8)
63+
64+
# The old NIB should be the new NIB, without padding and with a shorter struct
65+
# integer type.
66+
fixed_new_nib = [
67+
a if a.type != NwkState16 else old_nib_nwk_state
68+
for a in NIB.__attrs_attrs__
69+
if a.type is not PaddingByte
70+
]
71+
72+
assert fixed_new_nib == list(OldNIB.__attrs_attrs__)
73+
74+
75+
def test_nib_detection():
76+
assert isinstance(parse_nib(NEW_NIB), NIB)
77+
assert isinstance(parse_nib(OLD_NIB), OldNIB)
78+
79+
with pytest.raises(ValueError):
80+
parse_nib(NEW_NIB + b"\x00")
81+
82+
with pytest.raises(ValueError):
83+
parse_nib(OLD_NIB + b"\x00")
84+
85+
86+
def test_nib_parsing():
87+
new_nib, remaining = NIB.deserialize(NEW_NIB)
88+
assert not remaining
89+
90+
old_nib, remaining = OldNIB.deserialize(OLD_NIB)
91+
assert not remaining
92+
93+
assert new_nib.serialize() == NEW_NIB
94+
assert old_nib.serialize() == OLD_NIB
95+
96+
# Superficially validate that the NIBs were parsed correctly
97+
for nib in (new_nib, old_nib):
98+
assert nib.nwkProtocolVersion == 2
99+
100+
# Make sure the channel list is valid
101+
assert nib.channelList | t.Channels.ALL_CHANNELS == t.Channels.ALL_CHANNELS
102+
103+
# Make sure the logical channel is contained within the channel mask
104+
assert (
105+
t.Channels.from_channel_list([nib.nwkLogicalChannel]) | nib.channelList
106+
== nib.channelList
107+
)
108+
109+
assert nib.nwkKeyLoaded
110+
assert nib.nwkIsConcentrator
111+
assert nib.nwkManagerAddr == t.NWK(0x0000)
112+
assert nib.nwkCoordAddress == t.NWK(0x0000)

tests/test_types_struct.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,6 @@ class Nested(t.Struct):
8484
assert nstd.cmd.id_0 == 2
8585
assert nstd.cmd.id_1 == 3
8686

87-
test_2 = Nested((4, 5))
88-
assert test_2.cmd.id_0 == 4
89-
assert test_2.cmd.id_1 == 5
90-
9187

9288
def test_struct_converter_2():
9389
@attr.s

zigpy_znp/types/struct.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,14 @@ def deserialize(cls, data: bytes):
1717

1818
return cls(*args), data
1919

20+
@classmethod
21+
def fields(cls):
22+
return attr.fields(cls)
23+
2024
@classmethod
2125
def schema(cls):
2226
"""Return schema of the class."""
23-
return (a.type for a in attr.fields(cls))
27+
return (a.type for a in cls.fields())
2428

2529
def serialize(self):
2630
"""Serialize Structure."""
@@ -29,7 +33,7 @@ def serialize(self):
2933

3034
def as_tuple(self):
3135
"""Return tuple of fields values."""
32-
names = (f.name for f in attr.fields(self.__class__))
36+
names = (f.name for f in self.fields())
3337
return (getattr(self, n) for n in names)
3438

3539
@staticmethod
@@ -39,9 +43,7 @@ def converter(_type) -> typing.Callable:
3943
def converter(input):
4044
if isinstance(input, _type):
4145
return input
42-
try:
43-
return _type(*input)
44-
except TypeError:
45-
return _type(input)
46+
47+
return _type(input)
4648

4749
return converter

zigpy_znp/zigbee/application.py

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,11 @@
2121
import zigpy_znp.config as conf
2222
import zigpy_znp.types as t
2323
import zigpy_znp.commands as c
24+
2425
from zigpy_znp.exceptions import InvalidCommandResponse
2526

2627
from zigpy_znp.api import ZNP
28+
from zigpy_znp.znp.nib import parse_nib
2729
from zigpy_znp.types.nvids import NwkNvIds
2830

2931

@@ -420,22 +422,20 @@ async def startup(self, auto_form=False):
420422
endpoint=100, profile_id=zigpy.profiles.zll.PROFILE_ID, device_id=0x0005
421423
)
422424

423-
# Structure is in `zstack/stack/nwk/nwk.h`.
424-
#
425-
# This is hacky but since there are exactly two distinct firmwares, it doesn't
426-
# make sense to try and write some fancys struct padding guesser.
427-
nib = await self._znp.nvram_read(NwkNvIds.NIB)
428-
429-
if len(nib) == 116:
430-
# Structs aligned to 16 bits: CC26x2, CC13X2
431-
self._channel = nib[24]
432-
self._channels, _ = t.Channels.deserialize(nib[40:44])
433-
elif len(nib) == 110:
434-
# No struct alignment: CC2531
435-
self._channel = nib[22]
436-
self._channels, _ = t.Channels.deserialize(nib[36:40])
437-
else:
438-
LOGGER.warning("Could not extract channel information from NIB: %r", nib)
425+
nib = parse_nib(await self._znp.nvram_read(NwkNvIds.NIB))
426+
LOGGER.debug("Parsed NIB: %s", nib)
427+
428+
self._channel = nib.nwkLogicalChannel
429+
self._channels = nib.channelList
430+
self._pan_id = nib.nwkPanId
431+
self._ext_pan_id = nib.extendedPANID
432+
self._nwk = nib.nwkDevAddress
433+
434+
LOGGER.info(
435+
"Using channel mask %s, currently on channel %d",
436+
self.channels,
437+
self.channel,
438+
)
439439

440440
async def update_network(
441441
self,
@@ -481,17 +481,11 @@ async def update_network(
481481
self._channels = channels
482482

483483
if channel is not None:
484-
# XXX: We modify the logical channel value directly in the NIB
485-
nib = bytearray(await self._znp.nvram_read(NwkNvIds.NIB))
486-
487-
if len(nib) == 116:
488-
nib[24] = channel
489-
elif len(nib) == 110:
490-
nib[22] = channel
491-
else:
492-
raise RuntimeError(f"Cannot set channel in unknown NIB struct: {nib!r}")
493-
494-
await self._znp.nvram_write(NwkNvIds.NIB, nib)
484+
# We modify the logical channel value directly in the NIB.
485+
# Does this actually work?
486+
nib = parse_nib(await self._znp.nvram_read(NwkNvIds.NIB))
487+
nib.nwkLogicalChannel = channel
488+
await self._znp.nvram_write(NwkNvIds.NIB, nib.serialize())
495489

496490
self._channel = channel
497491

zigpy_znp/znp/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)