Skip to content

Commit 94fef2c

Browse files
committed
Clean up TCLK seed recomputation
1 parent 49ce224 commit 94fef2c

File tree

3 files changed

+65
-56
lines changed

3 files changed

+65
-56
lines changed

tests/tools/test_network_backup_restore.py

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -303,26 +303,29 @@ async def test_tc_frame_counter_zstack33(make_connected_znp):
303303
assert (await security.read_tc_frame_counter(znp)) == 0x98765432
304304

305305

306-
def ieee_and_key(text):
306+
def ieee_and_key(text) -> zigpy.state.Key:
307307
ieee, key = text.replace(":", "").split("|")
308308

309-
return t.EUI64(bytes.fromhex(ieee)[::-1]), t.KeyData(bytes.fromhex(key))
309+
return zigpy.state.Key(
310+
partner_ieee=t.EUI64(bytes.fromhex(ieee)[::-1]),
311+
key=t.KeyData(bytes.fromhex(key)),
312+
)
310313

311314

312315
def test_seed_candidate_finding_simple():
313-
ieee1, key1 = ieee_and_key("0011223344556677|000102030405060708090a0b0c0d0e0f")
314-
ieee2, key2 = ieee_and_key("1111223344556677|101112131415161718191a1b1c1d1e1f")
316+
k1 = ieee_and_key("0011223344556677|000102030405060708090a0b0c0d0e0f")
317+
k2 = ieee_and_key("1111223344556677|101112131415161718191a1b1c1d1e1f")
315318

316-
(c1, s1), (c2, s2) = security.iter_seed_candidates([(ieee1, key1), (ieee2, key2)])
319+
(count1, seed1), (count2, seed2) = security.iter_seed_candidates([k1, k2])
317320

318-
assert c1 == c2 == 1
321+
assert count1 == count2 == 1
319322

320-
sh1 = security.find_key_shift(ieee1, key1, s1)
321-
sh2 = security.find_key_shift(ieee2, key2, s2)
322-
assert sh1 is not None and sh2 is not None
323+
shift1 = security.find_key_shift(k1.partner_ieee, k1.key, seed1)
324+
shift2 = security.find_key_shift(k2.partner_ieee, k2.key, seed2)
325+
assert shift1 is not None and shift2 is not None
323326

324-
assert security.compute_key(ieee1, s1, sh1) == key1
325-
assert security.compute_key(ieee2, s2, sh2) == key2
327+
assert security.compute_key(k1.partner_ieee, seed1, shift1) == k1.key
328+
assert security.compute_key(k2.partner_ieee, seed2, shift2) == k2.key
326329

327330

328331
def min_rotate(lst):
@@ -366,7 +369,7 @@ def test_seed_candidate_finding_complex():
366369
# One seed generated all but one of the keys, so there are 24 equally valid seeds.
367370
# They are all really rotations of the same seed.
368371
assert [c for c, s in candidates].count(24) == 24
369-
assert len({min_rotate(s) for c, s in candidates if c == 24}) == 1
372+
assert len({min_rotate(bytes(s)) for c, s in candidates if c == 24}) == 1
370373

371374
# And one just for the bogus entry
372375
assert [c[0] for c in candidates].count(1) == 1

zigpy_znp/api.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -407,17 +407,25 @@ async def write_network_info(
407407

408408
LOGGER.debug("Writing children and keys")
409409

410-
new_tclk_seed = await security.write_devices(
410+
optimal_tclk_seed = security.find_optimal_tclk_seed(devices.values(), tclk_seed)
411+
412+
if tclk_seed != optimal_tclk_seed:
413+
LOGGER.warning(
414+
"Provided TCLK seed %s is not optimal, using %s instead.",
415+
tclk_seed,
416+
optimal_tclk_seed,
417+
)
418+
419+
await self.nvram.osal_write(OsalNvIds.TCLK_SEED, optimal_tclk_seed)
420+
tclk_seed = optimal_tclk_seed
421+
422+
await security.write_devices(
411423
znp=self,
412424
devices=list(devices.values()),
413425
tclk_seed=tclk_seed,
414426
counter_increment=0,
415427
)
416428

417-
# If the provided TCLK seed isn't optimal, overwrite it
418-
if new_tclk_seed != tclk_seed:
419-
await self.nvram.osal_write(OsalNvIds.TCLK_SEED, new_tclk_seed)
420-
421429
if self.version == 1.2:
422430
await self.nvram.osal_write(
423431
OsalNvIds.HAS_CONFIGURED_ZSTACK1,

zigpy_znp/znp/security.py

Lines changed: 37 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,17 @@ def rotate(lst: typing.Sequence, n: int) -> typing.Sequence:
2828
return lst[n:] + lst[:n]
2929

3030

31-
def compute_key(ieee: t.EUI64, tclk_seed: bytes, shift: int) -> t.KeyData:
31+
def compute_key(ieee: t.EUI64, tclk_seed: t.KeyData, shift: int) -> t.KeyData:
3232
rotated_tclk_seed = rotate(tclk_seed, n=shift)
3333
return t.KeyData([a ^ b for a, b in zip(rotated_tclk_seed, 2 * ieee.serialize())])
3434

3535

36-
def compute_tclk_seed(ieee: t.EUI64, key: t.KeyData, shift: int) -> bytes:
36+
def compute_tclk_seed(ieee: t.EUI64, key: t.KeyData, shift: int) -> t.KeyData:
3737
rotated_tclk_seed = bytes(a ^ b for a, b in zip(key, 2 * ieee.serialize()))
38-
return rotate(rotated_tclk_seed, n=-shift)
38+
return t.KeyData(rotate(rotated_tclk_seed, n=-shift))
3939

4040

41-
def find_key_shift(ieee: t.EUI64, key: t.KeyData, tclk_seed: bytes) -> int | None:
41+
def find_key_shift(ieee: t.EUI64, key: t.KeyData, tclk_seed: t.KeyData) -> int | None:
4242
for shift in range(0x00, 0x0F + 1):
4343
if tclk_seed == compute_tclk_seed(ieee, key, shift):
4444
return shift
@@ -47,20 +47,26 @@ def find_key_shift(ieee: t.EUI64, key: t.KeyData, tclk_seed: bytes) -> int | Non
4747

4848

4949
def count_seed_matches(
50-
ieees_and_keys: typing.Sequence[tuple[t.EUI64, t.KeyData]], tclk_seed: bytes
50+
keys: typing.Sequence[zigpy.state.Key], tclk_seed: t.KeyData
5151
) -> int:
52-
return sum(find_key_shift(i, k, tclk_seed) is not None for i, k in ieees_and_keys)
52+
count = 0
53+
54+
for key in keys:
55+
if find_key_shift(key.partner_ieee, key.key, tclk_seed) is not None:
56+
count += 1
57+
58+
return count
5359

5460

5561
def iter_seed_candidates(
56-
ieees_and_keys: typing.Sequence[tuple[t.EUI64, t.KeyData]]
62+
keys: typing.Sequence[zigpy.state.Key],
5763
) -> typing.Iterable[tuple[int, t.KeyData]]:
58-
for ieee, key in ieees_and_keys:
64+
for key in keys:
5965
# Derive a seed from each candidate. All rotations of a seed are equivalent.
60-
tclk_seed = t.KeyData(compute_tclk_seed(ieee, key, 0))
66+
tclk_seed = compute_tclk_seed(key.partner_ieee, key.key, 0)
6167

6268
# And see how many other keys share this same seed
63-
count = count_seed_matches(ieees_and_keys, tclk_seed)
69+
count = count_seed_matches(keys, tclk_seed)
6470

6571
yield count, tclk_seed
6672

@@ -167,7 +173,7 @@ async def read_addr_mgr_entries(znp: ZNP) -> typing.Sequence[t.AddrMgrEntry]:
167173

168174

169175
async def read_hashed_link_keys(
170-
znp: ZNP, tclk_seed: bytes
176+
znp: ZNP, tclk_seed: t.KeyData
171177
) -> typing.Iterable[zigpy.state.Key]:
172178
if znp.version >= 3.30:
173179
entries = znp.nvram.read_table(
@@ -352,37 +358,31 @@ async def write_addr_manager_entries(
352358
await znp.nvram.osal_write(OsalNvIds.ADDRMGR, t.AddressManagerTable(new_entries))
353359

354360

361+
def find_optimal_tclk_seed(
362+
devices: typing.Sequence[StoredDevice], tclk_seed: t.KeyData
363+
) -> t.KeyData:
364+
keys = [d.key for d in devices if d.key]
365+
366+
if not keys:
367+
return tclk_seed
368+
369+
best_count, best_seed = max(iter_seed_candidates(keys))
370+
tclk_count = count_seed_matches(keys, tclk_seed)
371+
assert tclk_count <= best_count
372+
373+
# Prefer the existing TCLK seed if it's as good as the others
374+
if tclk_count == best_count:
375+
return tclk_seed
376+
377+
return best_seed
378+
379+
355380
async def write_devices(
356381
znp: ZNP,
357382
devices: typing.Sequence[StoredDevice],
358383
counter_increment: t.uint32_t = 2500,
359384
tclk_seed: t.KeyData = None,
360385
) -> t.KeyData:
361-
ieees_and_keys = [(d.node_info.ieee, d.key.key) for d in devices if d.key]
362-
363-
# Find the tclk_seed that maximizes the number of keys that can be derived from it
364-
if ieees_and_keys:
365-
best_count, best_seed = max(iter_seed_candidates(ieees_and_keys))
366-
367-
# Check to see if the provided tclk_seed is also optimal
368-
if tclk_seed is not None:
369-
tclk_count = count_seed_matches(ieees_and_keys, tclk_seed)
370-
assert tclk_count <= best_count
371-
372-
if tclk_count < best_count:
373-
LOGGER.warning(
374-
"Provided TCLK seed %s only generates %d keys, but computed seed"
375-
" %s generates %d keys. Picking computed seed.",
376-
tclk_seed,
377-
tclk_count,
378-
best_seed,
379-
best_count,
380-
)
381-
else:
382-
best_seed = tclk_seed
383-
384-
tclk_seed = best_seed
385-
386386
hashed_link_key_table = []
387387
aps_key_data_table = []
388388
link_key_table = t.APSLinkKeyTable()
@@ -453,7 +453,7 @@ async def write_devices(
453453
await write_addr_manager_entries(znp, devices)
454454

455455
if old_link_key_table is None:
456-
return tclk_seed
456+
return
457457

458458
await znp.nvram.osal_write(OsalNvIds.APS_LINK_KEY_TABLE, new_link_key_table_value)
459459

@@ -498,5 +498,3 @@ async def write_devices(
498498
values=aps_key_data_table,
499499
fill_value=aps_key_data_fill_value,
500500
)
501-
502-
return tclk_seed

0 commit comments

Comments
 (0)