@@ -28,17 +28,17 @@ def rotate(lst: typing.Sequence, n: int) -> typing.Sequence:
28
28
return lst [n :] + lst [:n ]
29
29
30
30
31
- def compute_key (ieee : t .EUI64 , tclk_seed : bytes , shift : int ) -> t .KeyData :
31
+ def compute_key (ieee : t .EUI64 , tclk_seed : t . KeyData , shift : int ) -> t .KeyData :
32
32
rotated_tclk_seed = rotate (tclk_seed , n = shift )
33
33
return t .KeyData ([a ^ b for a , b in zip (rotated_tclk_seed , 2 * ieee .serialize ())])
34
34
35
35
36
- def compute_tclk_seed (ieee : t .EUI64 , key : t .KeyData , shift : int ) -> bytes :
36
+ def compute_tclk_seed (ieee : t .EUI64 , key : t .KeyData , shift : int ) -> t . KeyData :
37
37
rotated_tclk_seed = bytes (a ^ b for a , b in zip (key , 2 * ieee .serialize ()))
38
- return rotate (rotated_tclk_seed , n = - shift )
38
+ return t . KeyData ( rotate (rotated_tclk_seed , n = - shift ) )
39
39
40
40
41
- def find_key_shift (ieee : t .EUI64 , key : t .KeyData , tclk_seed : bytes ) -> int | None :
41
+ def find_key_shift (ieee : t .EUI64 , key : t .KeyData , tclk_seed : t . KeyData ) -> int | None :
42
42
for shift in range (0x00 , 0x0F + 1 ):
43
43
if tclk_seed == compute_tclk_seed (ieee , key , shift ):
44
44
return shift
@@ -47,20 +47,26 @@ def find_key_shift(ieee: t.EUI64, key: t.KeyData, tclk_seed: bytes) -> int | Non
47
47
48
48
49
49
def count_seed_matches (
50
- ieees_and_keys : typing .Sequence [tuple [ t . EUI64 , t . KeyData ]] , tclk_seed : bytes
50
+ keys : typing .Sequence [zigpy . state . Key ] , tclk_seed : t . KeyData
51
51
) -> int :
52
- return sum (find_key_shift (i , k , tclk_seed ) is not None for i , k in ieees_and_keys )
52
+ count = 0
53
+
54
+ for key in keys :
55
+ if find_key_shift (key .partner_ieee , key .key , tclk_seed ) is not None :
56
+ count += 1
57
+
58
+ return count
53
59
54
60
55
61
def iter_seed_candidates (
56
- ieees_and_keys : typing .Sequence [tuple [ t . EUI64 , t . KeyData ]]
62
+ keys : typing .Sequence [zigpy . state . Key ],
57
63
) -> typing .Iterable [tuple [int , t .KeyData ]]:
58
- for ieee , key in ieees_and_keys :
64
+ for key in keys :
59
65
# Derive a seed from each candidate. All rotations of a seed are equivalent.
60
- tclk_seed = t . KeyData ( compute_tclk_seed (ieee , key , 0 ) )
66
+ tclk_seed = compute_tclk_seed (key . partner_ieee , key . key , 0 )
61
67
62
68
# And see how many other keys share this same seed
63
- count = count_seed_matches (ieees_and_keys , tclk_seed )
69
+ count = count_seed_matches (keys , tclk_seed )
64
70
65
71
yield count , tclk_seed
66
72
@@ -167,7 +173,7 @@ async def read_addr_mgr_entries(znp: ZNP) -> typing.Sequence[t.AddrMgrEntry]:
167
173
168
174
169
175
async def read_hashed_link_keys (
170
- znp : ZNP , tclk_seed : bytes
176
+ znp : ZNP , tclk_seed : t . KeyData
171
177
) -> typing .Iterable [zigpy .state .Key ]:
172
178
if znp .version >= 3.30 :
173
179
entries = znp .nvram .read_table (
@@ -352,37 +358,31 @@ async def write_addr_manager_entries(
352
358
await znp .nvram .osal_write (OsalNvIds .ADDRMGR , t .AddressManagerTable (new_entries ))
353
359
354
360
361
+ def find_optimal_tclk_seed (
362
+ devices : typing .Sequence [StoredDevice ], tclk_seed : t .KeyData
363
+ ) -> t .KeyData :
364
+ keys = [d .key for d in devices if d .key ]
365
+
366
+ if not keys :
367
+ return tclk_seed
368
+
369
+ best_count , best_seed = max (iter_seed_candidates (keys ))
370
+ tclk_count = count_seed_matches (keys , tclk_seed )
371
+ assert tclk_count <= best_count
372
+
373
+ # Prefer the existing TCLK seed if it's as good as the others
374
+ if tclk_count == best_count :
375
+ return tclk_seed
376
+
377
+ return best_seed
378
+
379
+
355
380
async def write_devices (
356
381
znp : ZNP ,
357
382
devices : typing .Sequence [StoredDevice ],
358
383
counter_increment : t .uint32_t = 2500 ,
359
384
tclk_seed : t .KeyData = None ,
360
385
) -> t .KeyData :
361
- ieees_and_keys = [(d .node_info .ieee , d .key .key ) for d in devices if d .key ]
362
-
363
- # Find the tclk_seed that maximizes the number of keys that can be derived from it
364
- if ieees_and_keys :
365
- best_count , best_seed = max (iter_seed_candidates (ieees_and_keys ))
366
-
367
- # Check to see if the provided tclk_seed is also optimal
368
- if tclk_seed is not None :
369
- tclk_count = count_seed_matches (ieees_and_keys , tclk_seed )
370
- assert tclk_count <= best_count
371
-
372
- if tclk_count < best_count :
373
- LOGGER .warning (
374
- "Provided TCLK seed %s only generates %d keys, but computed seed"
375
- " %s generates %d keys. Picking computed seed." ,
376
- tclk_seed ,
377
- tclk_count ,
378
- best_seed ,
379
- best_count ,
380
- )
381
- else :
382
- best_seed = tclk_seed
383
-
384
- tclk_seed = best_seed
385
-
386
386
hashed_link_key_table = []
387
387
aps_key_data_table = []
388
388
link_key_table = t .APSLinkKeyTable ()
@@ -453,7 +453,7 @@ async def write_devices(
453
453
await write_addr_manager_entries (znp , devices )
454
454
455
455
if old_link_key_table is None :
456
- return tclk_seed
456
+ return
457
457
458
458
await znp .nvram .osal_write (OsalNvIds .APS_LINK_KEY_TABLE , new_link_key_table_value )
459
459
@@ -498,5 +498,3 @@ async def write_devices(
498
498
values = aps_key_data_table ,
499
499
fill_value = aps_key_data_fill_value ,
500
500
)
501
-
502
- return tclk_seed
0 commit comments